D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clwpos
/
Filename :
data_collector_utils.py
back
Copy
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import annotations import logging import os import re from pathlib import Path from typing import List from typing_extensions import TypedDict from packaging.version import InvalidVersion, Version, parse as parse_version from clcommon.clwpos_lib import find_wp_paths, get_wp_cache_plugin from clcommon.cpapi import userdomains, get_main_username_by_uid, get_installed_php_versions, getCPName from clwpos import _ from clwpos.cl_wpos_exceptions import WposError from clwpos.daemon import WposDaemon from clwpos.php.base import PHP from clwpos.scoped_cache import cached_in_scope from clwpos.utils import ( daemon_communicate, _get_data_from_info_json ) def _get_doc_roots_info() -> dict: user = get_main_username_by_uid(os.geteuid()) result = {} for domain, doc_root in userdomains(user): result.setdefault(doc_root, []).append(domain) return result def has_wps(username: str) -> bool: """Check if user has at least one WordPress installation""" _info = userdomains(username) # all docroots (like it is done in `_add_wp_path_info`) excludes = list(i[1] for i in _info) for domain, doc_root in _info: wps = list(find_wp_paths(doc_root, excludes=excludes)) # return immediately once first WP installation found if len(wps): return True return False def _add_wp_path_info(user_info: dict) -> dict: wp_paths = {} for doc_root, domains in user_info.items(): # excludes only affects subpaths of doc_root excludes = list(user_info) item = { "domains": domains, "wp_paths": list(find_wp_paths(doc_root, excludes=excludes)), } wp_paths[doc_root] = item return wp_paths _WP_VERSION_RE = re.compile(r"""wp_version\s*=\s*(?:'|")([^"']+)(?:'|")\s*;""", re.MULTILINE) def _get_wp_info_from_version_php_code(content: str) -> str | None: """ Simply search a value of `wp_version` variable in PHP code without evaluation """ match = _WP_VERSION_RE.search(content) try: return match.group(1).strip() if match else None except IndexError: return None def _wp_info(doc_root: str, wp_path: str) -> dict: """Convert WP path to {"path": str, "version": str}""" absolute_wp_path = Path(doc_root, wp_path) version_missing_reason = None version_php = absolute_wp_path / "wp-includes/version.php" try: version_php_code = version_php.read_text("utf-8", errors="ignore") except OSError: version_raw = None version_missing_reason = _("wp-includes/version.php file is missing") else: version_raw = _get_wp_info_from_version_php_code(version_php_code) if not version_raw: version_missing_reason = _("wp-includes/version.php file is invalid") version = None if version_raw: try: version = parse_version(version_raw) except InvalidVersion: version_missing_reason = _("wp-includes/version.php contains invalid identifier") return { "path": wp_path, "version": version, "version_missing_reason": version_missing_reason, } def _add_wp_info(user_info: dict) -> dict: for doc_root, doc_root_info in user_info.items(): wp_paths = doc_root_info.pop("wp_paths") doc_root_info["wps"] = [_wp_info(doc_root, wp_path) for wp_path in wp_paths] return user_info @cached_in_scope def _php_get_vhost_versions(uid): """ uid param is used for caching each distinct uid @return: [ { "account": "rm3", "documentroot": "/home/example/public_html", "version": "ea-php72", "handler": "php-fpm", "vhost": "otherchars.rm3.tld" } ] """ try: return daemon_communicate({"command": WposDaemon.DAEMON_PHP_GET_VHOST_VERSIONS_COMMAND})["data"] except WposError: return _get_data_from_info_json("vhost_versions") def _is_php_selector_applicable(handler): """ 1. If handler is not set to lsphp -> php version selected in PHP Selector will not be applied: https://cloudlinux.zendesk.com/hc/en-us/articles/4414030658962-PHP-Selector-does-not-apply-on-a-server-with-DirectAdmin-panel DirectAdmin PHP Selector could override CloudLinux PHP selector, even loaded .ini could not give 100% So let's do not even try apply selector, if it has no sense """ if getCPName() == 'DirectAdmin' and handler != 'lsapi': return False return True class Info(TypedDict): # e.g. 'user10.com' vhost: str # e.g. 'user10' account: str # e.g. 'system-php72' version: PHP # e.g. 'php-fpm' handler: str # e.g. '/home/user10/public_html' documentroot: str def php_info() -> List[Info]: """ Returns php info, example: [{'vhost': 'sub.wposuser.com', 'account': 'stackoverflow', 'version': 'ea-php80', 'handler': 'php-fpm', 'documentroot': '/home/stackoverflow/public_html'}, ...................................................................] """ installed_php_versions = get_cached_php_installed_versions() vhosts_php_description = _php_get_vhost_versions(os.geteuid()) result = [] for vhost_data in vhosts_php_description: try: php_version = next( version for version in installed_php_versions if version.identifier == vhost_data['version']) except: raise RuntimeError("%s version is not known %s" % ( vhost_data['version'], installed_php_versions)) version_used = php_version if not _is_php_selector_applicable(vhost_data['handler']) \ else php_version.apply_php_selector() result.append(Info( vhost=vhost_data['vhost'], account=vhost_data['account'], version=version_used, handler=vhost_data['handler'], documentroot=vhost_data['documentroot'], )) return result def _add_php(user_info: dict) -> dict: """ Updates user_info dict with php data """ result = php_info() for item in result: if item["documentroot"] not in user_info: raise WposError(_("Documentroot %(documentroot)s is missing in user information. " "This might indicate that hosting account information was " "changed by the control panel during data processing. " "Usually this is not fatal, try again later and " "contact CloudLinux support if the issue persists."), context=dict(documentroot=item["documentroot"])) user_info[item["documentroot"]]["php"] = { "version": item["version"], "handler": item["handler"] } return user_info def _add_object_cache_info(user_info: dict) -> dict: """ Search for 'object-cache.php' files in 'wp-content/plugins' directory in order to find what plugin is being used for object caching. """ for doc_root, doc_root_info in user_info.items(): for wp in doc_root_info["wps"]: plugin = get_wp_cache_plugin(Path(doc_root).joinpath(wp["path"]), "object-cache") wp["object_cache"] = plugin return user_info def get_user_info() -> dict: """ Collect info about user. @return { '/home/user/public_html': { 'domains': ['domain.com'], 'wps': [ { 'path': 'wp_path_1', 'version': '5.7.2', 'object_cache': 'redis-cache' }, { 'path': 'wp_path_2', 'version': None, 'object_cache': 'redis-cache' } ], 'php': { 'version': 'ea-php74', 'handler': 'cgi', 'redis_extension': False } } } """ user_info = _get_doc_roots_info() for func in (_add_wp_path_info, _add_wp_info, _add_php): user_info = func(user_info) return user_info def _php_get_installed_versions(): """ @return: [ "ea-php74" ] """ if os.geteuid(): try: return daemon_communicate({ "command": WposDaemon.DAEMON_PHP_GET_INSTALLED_VERSIONS_COMMAND })["data"] except WposError: return _get_data_from_info_json("installed_versions") return get_installed_php_versions() @cached_in_scope def get_cached_php_installed_versions() -> List[PHP]: """ List all installed php version on the system :return: installed php version """ result = _php_get_installed_versions() installed_php_versions = [ PHP( identifier=php_description['identifier'], version=php_description['version'], modules_dir=php_description['modules_dir'], dir=php_description['dir'], bin=php_description['bin'], ini=php_description['ini'] ) for php_description in result ] return installed_php_versions