D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clwpos
/
optimization_features
/
Filename :
features.py
back
Copy
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import annotations import argparse import json import os import re import subprocess from pathlib import Path from typing import Dict, Optional from distutils.version import LooseVersion from clcommon.clwpos_lib import get_wp_cache_plugin from clwpos import gettext as _, constants from clwpos.cl_wpos_exceptions import WposError, WpCliCommandError from clwpos.constants import PULLZONE_DOMAIN_PROTOCOL, SMART_ADVISE_USER_UTILITY from clwpos.data_collector_utils import get_cached_php_installed_versions from clwpos.utils import run_in_cagefs_if_needed, redis_is_running from dataclasses import dataclass, field, asdict from enum import Enum from clwpos.constants import ( CL_DOC_USER_PLUGIN, CLSOP_ZIP_PATH ) from clwpos.logsetup import setup_logging from clwpos.utils import ( clear_redis_cache_config, create_redis_cache_config, litespeed_is_running ) from clwpos.php.base import PHP from clwpos.wp_utils import ( wordpress, WordpressError, is_plugin_activated, is_plugin_installed, obtain_wp_cli_env, diagnose_redis_connection_constants, is_multisite, list_active_plugins, get_plugin_data, wp_get_constant ) class PluginStatus(Enum): UNINSTALLED = 'uninstalled' ACTIVE = 'active' INACTIVE = 'inactive' @dataclass class Issue: """ Generic class for keeping compatibility/misconfiguration issues """ header: str description: str fix_tip: str context: Dict[str, str] = field(default_factory=dict) @property def dict_repr(self): return asdict(self) class UniqueId: PHP_NOT_SUPPORTED = 'PHP_NOT_SUPPORTED' PLUGIN_CONFLICT = 'PLUGIN_CONFLICT' WORDPRESS_MULTISITE_ENABLED = 'WORDPRESS_MULTISITE_ENABLED' MISCONFIGURED_WORDPRESS = 'MISCONFIGURED_WORDPRESS' WEBSERVER_NOT_SUPPORTED = 'WEBSERVER_NOT_SUPPORTED' PHP_MISCONFIGURATION = 'PHP_MISCONFIGURATION' UNCOMPATIBLE_WORDPRESS_VERSION = 'UNCOMPATIBLE_WORDPRESS_VERSION' AWP_NOT_SUPPORTS_CDN = 'AWP_NOT_SUPPORTS_CDN' AWP_NOT_SUPPORTS_IMAGE_OPTIMIZATION = 'AWP_NOT_SUPPORTS_IMAGE_OPTIMIZATION' AWP_NOT_SUPPORTS_CPCSS = 'AWP_NOT_SUPPORTS_CPCSS' NS_CDN_CONFLICT = 'NS_CDN_CONFLICT' CLOUDLINUX_MODULE_ALREADY_ENABLED = 'CLOUDLINUX_MODULE_ALREADY_ENABLED' FEATURE_HIDDEN_SERVER_WIDE = 'FEATURE_HIDDEN_SERVER_WIDE' FEATURE_NOT_MADE_VISIBLE = 'FEATURE_NOT_MADE_VISIBLE' @dataclass class CompatibilityIssue(Issue): """ For compatibility issues """ unique_id: str | None = None telemetry: Dict[str, str] = field(default_factory=dict) type: str = 'incompatibility' @property def dict_repr(self): representation = asdict(self) representation.pop('unique_id') representation.pop('telemetry') return representation @dataclass class MisconfigurationIssue(Issue): """ For misconfiguration issues """ type: str = 'misconfiguration' WP_MISCONFIGURED_FIX_TIP = _( 'Check that your website is working properly – ' 'try to run the specified command to find any obvious ' 'errors in the WordPress configuration. ' 'Otherwise, try to fix other issues first - ' 'it may help to resolve this issue as well.' ) WP_BROKEN_CORE_REPAIR_TIP = _( '1) Utilize WordPress Toolkit: If your site was deployed using ' 'WordPress Toolkit, use this tool first for repairing or restoring ' 'missing core files.\n' '2) Manually re-upload WordPress core files: As a secondary option, ' 'download the latest version of WordPress from the official website and ' 're-upload the core files to your server. Be careful not to overwrite ' 'your wp-content folder or wp-config.php file.' ) class BillableFeatureMixin: def _get_or_create_unique_identifier(self): """ Wrapper for easy mocking """ from clwpos.billing import get_unique_identifier_as_user return get_unique_identifier_as_user() class Feature(str): """ Helper class which hides differences of optimization features behind abstract methods. """ NAME = '' WP_PLUGIN_NAME = '' HAS_LICENSE_TERMS = False LICENSE_TERMS_PATH = None _logger = setup_logging(f'{NAME.lower()}_feature') def __new__(cls, *args, **kwargs): if cls != Feature: return str.__new__(cls, *args) classes = { "object_cache": _ObjectCache, # yep, site_optimization and accelerate_wp names are same thing "site_optimization": _SiteOptimization, "accelerate_wp": _SiteOptimization, 'cdn': _Cdn, 'critical_css': _CriticalCSS, 'image_optimization': _ImageOptimization } try: return classes[args[0]](*args) except KeyError: raise argparse.ArgumentTypeError(f"No such feature: {args[0]}.") @classmethod def optimization_feature(cls): return cls(cls.NAME.lower()) @classmethod def included_optimization_features(cls): return [cls.optimization_feature()] @classmethod def redis_daemon_required(cls): raise NotImplementedError @classmethod def collect_docroot_issues(cls, doc_root_info, visible_features=None): raise NotImplementedError @classmethod def is_php_supported(cls, php_version: PHP): raise NotImplementedError @classmethod def minimum_supported_wp_version(cls): raise NotImplementedError @staticmethod def collect_wordpress_issues(wordpress: Dict, docroot: str, module_is_enabled: bool): raise NotImplementedError @staticmethod def to_interface_name(): raise NotImplementedError @staticmethod def get_wp_plugin_status(wordpress_abs_path, plugin_name) -> PluginStatus: """ Get information about WordPress plugin current status. :param wordpress_abs_path: absolute path to wordpress installation :param plugin_name: name of plugin as it listed in plugins directory :return: PluginStatus """ response = Feature.get_plugin_data(wordpress_abs_path, plugin_name) # in case of missing plugin wp-cli returns empty dict if not response: return PluginStatus.UNINSTALLED # in any other case we get list of one element with parameters return PluginStatus(response[0]['status']) @staticmethod def get_plugin_data(wordpress_abs_path, plugin_name): return get_plugin_data(wordpress_abs_path, plugin_name) @staticmethod def get_plugin_version(wordpress_abs_path, plugin_name) -> str: response = Feature.get_plugin_data(wordpress_abs_path, plugin_name) # in case of missing plugin wp-cli returns empty dict if not response: raise WposError( message=_( 'Malformed plugins information received from wp-cli, ' 'unable to detect %(plugin)s version'), context={'plugin': plugin_name}, ) return response[0]['version'] @classmethod def _get_wp_plugin_compatibility_issues(cls, docroot, wordpress): """ Get issues that relates to currently installed WP plugin or None if everything is ok """ try: plugin_status = cls.get_wp_plugin_status( wordpress_abs_path=os.path.join(docroot, wordpress["path"]), plugin_name=cls.WP_PLUGIN_NAME ) except WposError as e: return CompatibilityIssue( header=_('Unexpected WordPress error'), description=_( 'Unable to detect the WordPress plugins ' 'due to unexpected error. ' '\n\n' 'Technical details:\n%(error_message)s.\n' '\nMost likely WordPress installation is not working properly.' ), fix_tip=WP_MISCONFIGURED_FIX_TIP, context=dict( error_message=e.message % e.context ), unique_id=UniqueId.MISCONFIGURED_WORDPRESS, telemetry=dict( error_message=e.message % e.context ) ) return cls._get_issues_from_wp_plugin_status(plugin_status) @classmethod def _get_issues_from_wp_plugin_status(cls, plugin_status): raise NotImplementedError @classmethod def install(cls, abs_wp_path: str): raise NotImplementedError @classmethod def enable(cls, abs_wp_path: str, *args, **kwargs): raise NotImplementedError @classmethod def disable(cls, abs_wp_path: str, **kwargs): raise NotImplementedError class _ObjectCache(Feature): """Implementation for object caching""" NAME = 'OBJECT_CACHE' WP_PLUGIN_NAME = 'redis-cache' MINIMUM_SUPPORTED_PHP_OBJECT_CACHE = '7.2' @classmethod def redis_daemon_required(cls): return True @staticmethod def to_interface_name(): return 'object_cache' @classmethod def _get_issues_from_wp_plugin_status(cls, plugin_status): """ Get issue that relates to currently installed redis-cache plugin or None if everything is ok """ if plugin_status == PluginStatus.INACTIVE: return MisconfigurationIssue( header=_('"Redis Object Cache" plugin is deactivated'), description=_('Object caching is enabled, but the ' '"Redis Object Cache" plugin is deactivated in Wordpress admin page. Caching does not work'), fix_tip=_('Activate the Redis Object Cache plugin in the Wordpress admin page and ' 'enable Object Cache Drop-in in the Redis Object Cache plugin settings. ' 'As an alternative, rollback the feature and apply it again.') ) elif plugin_status == PluginStatus.ACTIVE: return MisconfigurationIssue( header=_('The Object Cache Drop-in not installed'), description=_('The Object Cache Drop-In is not enabled. Caching does not work'), fix_tip=_('Enable the Object Cache using the Redis Object Cache plugin ' 'settings page of Wordpress Admin. ' 'As an alternative, rollback the feature and apply it again.') ) elif plugin_status == PluginStatus.UNINSTALLED: return MisconfigurationIssue( header=_('"Redis Object Cache" plugin is not installed'), description=_('The "Redis Object Cache" WordPress plugin is not installed. ' 'Caching does not work'), fix_tip=_('Rollback the feature and apply it again. ' 'Contact your administrator if the issue persists.') ) else: raise WposError(_('Unexpected plugin status: %(status)s'), context=dict(status=plugin_status)) @classmethod def _get_supported_php_versions(cls) -> list[str]: versions: list[PHP] = get_cached_php_installed_versions() return [ version.identifier for version in versions if version.is_extension_loaded('redis') ] # TODO: drop visible_features and unify _check_php_redis_extension @classmethod def collect_docroot_issues(cls, doc_root_info: dict, visible_features=tuple()): """ Collects incompatibilities related to docroot (non-supported handler, etc) for object caching. Please keep this function as small as possible and create additional functions to check specific things. """ issues = [] php_version: PHP = doc_root_info['php_version'] issues.extend( cls._check_php_redis_extension( php_version, is_feature_visible=('object_cache' in visible_features))) is_litespeed = litespeed_is_running() if not is_litespeed: issues.extend(cls._check_php_handlers(php_version, doc_root_info['php_handler'])) issues.extend( cls._check_php_extension_conflicts(php_version)) return issues @classmethod def _check_php_redis_extension(cls, php_version: PHP, is_feature_visible: bool): """ Check that vhost php configuration is compatible with this module. This includes verification of redis extensions and php version. @returns list of issues """ # FIXME: IMPORTANT NOTE ABOUT is_feature_visible # We initially added this check to hide incompatibilities in wpos-user scan # response when module is turned off (scan has filter by type): # `for issue in incompatibilities if isinstance(issue, CompatibilityIssue)` # we did that because incompatibilities were sent only once # and never updated afterwards, and redis extension is installed automatically # during first set-suite --visible. Right now this check is redundant because # we update incompatibilities once a day. Remove it once it starts causing troubles. issues = [] header__, fix_tip__, description__, uniq_id__, telemetry__ = None, None, None, None, None supported_php_versions = cls._get_supported_php_versions() if not cls.is_php_supported(php_version): header__ = _('PHP version is not supported') fix_tip__ = _('Please, set or ask your system administrator to set one of the ' 'supported PHP versions: %(compatible_versions)s') description__ = _('Non supported PHP version %(php_version)s currently is used.') uniq_id__ = UniqueId.PHP_NOT_SUPPORTED telemetry__ = dict( reason='PHP_VERSION_TOO_LOW', php_version=php_version.identifier, supported_php_versions=supported_php_versions ) elif not php_version.is_extension_installed('redis'): header = _('Redis extension is not installed for selected php version') fix_tip = _('Please, install or ask your system administrator to install redis extension ' 'for current %(php_version)s version, or use one of the compatible php versions: ' '%(compatible_versions)s for the domain.') description = _('Redis PHP extension is required for optimization feature, but not installed for ' 'selected PHP version: %(php_version)s.') # in order to create advices when module is not visible yet if not is_feature_visible: issues.append(MisconfigurationIssue( header=header, fix_tip=fix_tip, description=description, context=dict(php_version=php_version.identifier, compatible_versions=', '.join(supported_php_versions),))) else: header__ = header fix_tip__ = fix_tip description__ = description uniq_id__ = UniqueId.PHP_NOT_SUPPORTED telemetry__ = dict( php_version=php_version.identifier, reason='PHP_REDIS_NOT_INSTALLED', supported_php_versions=supported_php_versions ) elif not php_version.is_extension_loaded('redis'): header = _('Redis extension is not loaded for selected php version') fix_tip = _('Please, load or ask your system administrator to load redis extension ' 'for current %(php_version)s version, or use one of the compatible php versions: ' '%(compatible_versions)s for the domain.') description = _('Redis PHP extension is required for optimization feature, but not loaded for ' 'selected PHP version: %(php_version)s.') if not is_feature_visible: issues.append(MisconfigurationIssue( header=header, fix_tip=fix_tip, description=description, context=dict( php_version=php_version.identifier, compatible_versions=', '.join(supported_php_versions)) )) else: header__ = header fix_tip__ = fix_tip description__ = description uniq_id__ = UniqueId.PHP_NOT_SUPPORTED telemetry__ = dict( php_version=php_version.identifier, reason='PHP_REDIS_NOT_LOADED', supported_php_versions=supported_php_versions ) if not supported_php_versions: fix_tip__ = _('Please, ask your system administrator to setup at least ' 'one of the recommended PHP version in accordance with docs (%(docs_url)s).') if header__ is not None: issues.append( CompatibilityIssue( header=header__, description=description__, fix_tip=fix_tip__, context=dict(php_version=php_version.identifier, compatible_versions=', '.join(supported_php_versions), docs_url=constants.CL_DOC_USER_PLUGIN), unique_id=uniq_id__, telemetry=telemetry__ ) ) return issues @classmethod def _check_php_handlers(cls, php_version, php_handler, supported_handlers=('php-fpm', 'lsapi')): """ Check that php has handler that object caching supports. Technically, caching works with redis and so supports all php handlers, but we are not sure about performance being improved, so we made them incompatible. """ issues = [] if php_handler not in supported_handlers: issues.append( CompatibilityIssue( header=_('Unsupported PHP handler'), description=_('Website uses unsupported PHP handler. Currently supported ' 'handler(s): %(supported_handlers)s.'), fix_tip=_('Please, set or ask your system administrator to set one of the ' 'supported PHP handlers for the domain: %(supported_handlers)s. ' 'Or keep watching our blog: %(blog_url)s for supported handlers list updates.'), context={ 'supported_handlers': ", ".join(supported_handlers), 'blog_url': 'https://blog.cloudlinux.com/' }, unique_id=UniqueId.PHP_NOT_SUPPORTED, telemetry=dict( reason='PHP_UNSUPPORTED_HANDLER', handler=php_handler, supported_handlers=supported_handlers, php_version=php_version.identifier ) ) ) return issues @classmethod def _check_php_extension_conflicts(cls, php_version): """ Check that php verion that website uses does not have any conflicting php extensions. """ issues = [] incompatible_module = 'snuffleupagus' if php_version.is_extension_installed(extension=incompatible_module): issues.append( CompatibilityIssue( header=_('Unsupported PHP module is loaded'), description=_('Incompatible PHP module "%(incompatible_module)s" is currently used.'), fix_tip=_('Please, disable or remove "%(incompatible_module)s" PHP extension.'), context=dict(incompatible_module=incompatible_module), unique_id=UniqueId.PHP_NOT_SUPPORTED, telemetry=dict( php_version=php_version.identifier ) )) return issues @classmethod def is_php_supported(cls, php_version: PHP | None): """ Check if passed php version >= minimum PHP version supported by object caching. """ return LooseVersion(php_version.version) >= LooseVersion(cls.MINIMUM_SUPPORTED_PHP_OBJECT_CACHE) @classmethod def minimum_supported_wp_version(cls): return constants.MINIMUM_SUPPORTED_WP_OBJECT_CACHE @classmethod def collect_wordpress_issues(cls, wordpress: Dict, docroot: str, module_is_enabled: bool): issues = [] wp_dir = Path(docroot).joinpath(wordpress["path"]) wp_content_dir = wp_dir.joinpath("wp-content") plugin_type = "object-cache" detected_object_cache_plugin = get_wp_cache_plugin(wp_dir, plugin_type) if module_is_enabled: if detected_object_cache_plugin != "redis-cache": issue = cls._get_wp_plugin_compatibility_issues(docroot, wordpress) if issue: issues.append(issue) if not redis_is_running(): issues.append( MisconfigurationIssue( header=_('Redis is not running'), description=_('Object caching is enabled, but redis process is not running.'), fix_tip=_('Redis will start automatically in 5 minutes. ' 'If the issue persists - contact your system administrator and report this issue') ) ) try: diagnose_redis_connection_constants(docroot, wordpress['path']) except WpCliCommandError as e: issues.append( MisconfigurationIssue( header=_('Unable to identify redis constants in wordpress config'), description=_('wp-cli utility returns malformed response, reason: "%(reason)s"'), fix_tip=_('Please, try to check executed command and fix possible issues with it. ' 'If issue persists - please, contact CloudLinux support.'), context=dict( reason=e.message % e.context ) ) ) except WposError as e: issues.append( MisconfigurationIssue( header=_('Missed redis constants in site config'), description=_('WordPress config does not have needed constants ' 'for redis connection establishment.\n' 'Details: %(reason)s'), fix_tip=_('Please, try to disable and enable plugin again. ' 'If issue persists - please, contact CloudLinux support.'), context=dict( reason=e.message % e.context ) ) ) if detected_object_cache_plugin == "Unknown": drop_in_file = wp_content_dir.joinpath(f'{plugin_type}.php') issues.append( CompatibilityIssue( header=_('Conflicting object caching plugin enabled'), description=_('Unknown custom object caching plugin is already enabled'), fix_tip=_('Remove the drop-in (%s) file from the WordPress ' 'instance because it conflicts with AccelerateWP object caching.') % drop_in_file, unique_id=UniqueId.PLUGIN_CONFLICT, telemetry=dict( reason='OBJECT_CACHE_ALREADY_ENABLED', plugin=detected_object_cache_plugin ) )) elif detected_object_cache_plugin == "w3-total-cache": issues.append( CompatibilityIssue( header=_('Object Caching of W3 Total Cache plugin is incompatible'), description=_('WordPress website already has Object Caching feature enabled ' 'with caching backend configured by the the W3 Total Cache plugin.'), fix_tip=_('Deactivate Object Caching in W3 Total Cache plugin settings.'), context=dict(), unique_id=UniqueId.PLUGIN_CONFLICT, telemetry=dict( reason='OBJECT_CACHE_ALREADY_ENABLED', plugin=detected_object_cache_plugin ) )) elif detected_object_cache_plugin not in (None, "redis-cache"): issues.append( CompatibilityIssue( header=_('Conflicting object caching plugin enabled'), description=_('The "%(detected_wp_plugin)s" plugin conflicts with AccelerateWP object caching.'), fix_tip=_('Deactivate object caching in the plugin settings or completely uninstall' 'the conflicting plugin using the WordPress administration interface.'), context=dict(detected_wp_plugin=detected_object_cache_plugin), unique_id=UniqueId.PLUGIN_CONFLICT, telemetry=dict( reason='OBJECT_CACHE_ALREADY_ENABLED', plugin=detected_object_cache_plugin ) )) try: if not cls._check_installed_roc_plugin(os.path.join(docroot, wordpress['path'])): issues.append( CompatibilityIssue( header=_('Another Redis Object Cache plugin is installed'), description=_('Non CloudLinux Redis Object Cache is installed for the website'), fix_tip=_('Uninstall Redis Object Cache plugin using WordPress administration page'), unique_id=UniqueId.PLUGIN_CONFLICT, telemetry=dict( reason='OBJECT_CACHE_ALREADY_ENABLED', plugin=detected_object_cache_plugin ) )) except WpCliCommandError as e: issues.append( MisconfigurationIssue( header=_('Unable to identify installed object cache plugin in WordPress'), description=_('wp-cli utility returns malformed response, reason: "%(reason)s"'), fix_tip=_('Please, try to check executed command and fix possible issues with it. ' 'If issue persists - please, contact CloudLinux support.'), context=dict( reason=e.message % e.context ) ) ) try: multisite = is_multisite(os.path.join(docroot, wordpress["path"])) if multisite: issues.append( CompatibilityIssue( header=_('WordPress Multisite mode is enabled'), description=_('WordPress uses the Multisite mode which is currently not supported.'), fix_tip=_('Install or configure WordPress in the single-site mode.'), unique_id=UniqueId.WORDPRESS_MULTISITE_ENABLED, telemetry=dict() )) except WposError as e: issues.append( CompatibilityIssue( header=_('Unexpected WordPress error'), description=_('Unable to detect if the WordPress installation has the Multisite mode enabled ' 'mode due to unexpected error. ' '\n\n' 'Technical details:\n%(error_message)s.\n' '\nMost likely WordPress installation is not working properly.'), fix_tip=_('If this is only one issue, please check that your website is working properly – ' 'try to run the specified command to find any obvious ' 'errors in the WordPress configuration. ' 'Otherwise, try to fix other issues first - it may help to resolve this issue as well.'), context=dict( error_message=e.message % e.context ), unique_id=UniqueId.MISCONFIGURED_WORDPRESS, telemetry=dict( error_message=e.message % e.context ) )) return issues @staticmethod def _is_our_roc_plugin(wp_path: str) -> bool: """ Checks that WP's WP_REDIS_PATH is defined and contains our path """ wp_redis_path = wp_get_constant(wp_path, 'WP_REDIS_PATH', raise_exception=True) if wp_redis_path and "/.clwpos/redis.sock" in wp_redis_path: return True return False @classmethod def _check_installed_roc_plugin(cls, wp_path: str) -> bool: """ Checks that ROC plugin was installed by us, or not exists at all Returns False if the plugin was found but our config modifications was not """ if os.path.exists(os.path.join(wp_path, 'wp-content', 'plugins', 'redis-cache')): if not cls._is_our_roc_plugin(wp_path): return False return True @classmethod def install(cls, abs_wp_path: str): """ Install redis-cache plugin for user. :param abs_wp_path: absolute path to wp site :return: """ res = wordpress(abs_wp_path, "plugin", "install", cls.WP_PLUGIN_NAME) if isinstance(res, WordpressError): raise WposError(message=res.message, context=res.context) @classmethod def enable(cls, abs_wp_path: str, *args, **kwargs): """ Enable redis-cache plugin for user. :param abs_wp_path: absolute path to wp site :return: """ create_redis_cache_config(abs_wp_path) errors = [] res = wordpress(abs_wp_path, "plugin", "activate", cls.WP_PLUGIN_NAME) if isinstance(res, WordpressError): errors.append(res) if not errors: res = wordpress(abs_wp_path, "redis", "enable") if isinstance(res, WordpressError): errors.append(res) if errors: clear_redis_cache_config(abs_wp_path) raise WposError(message='Errors during enabling feature: %(error)s', context=dict(error=str(errors))) @classmethod def disable(cls, abs_wp_path: str, **kwargs): """ Delete cloudlinux info from wp-config.php, deactivate and delete redis-cache plugin for user. :param abs_wp_path: absolute path to wp site :return: list of errors that occurred during command execution """ errors = [] if is_plugin_activated(abs_wp_path, cls.WP_PLUGIN_NAME): res = wordpress(abs_wp_path, "plugin", "deactivate", cls.WP_PLUGIN_NAME) if isinstance(res, WordpressError): errors.append(res) if not errors and is_plugin_installed(abs_wp_path, cls.WP_PLUGIN_NAME): # continue procedure further only if previous step succeeded res = wordpress(abs_wp_path, "plugin", "delete", cls.WP_PLUGIN_NAME) if isinstance(res, WordpressError): errors.append(res) if not errors: # cleanup constants in the end only if deactivation/deletion succeeded, # because it may impact on deactivating/deleting plugin try: clear_redis_cache_config(abs_wp_path) except WposError as err: cls._logger.exception(err) errors.append(WordpressError(err.message, err.context)) except Exception as e: cls._logger.exception(e) errors.append( WordpressError( message=_( 'Unexpected error happened while clearing cache: %(error)s'), context=dict(error=str(e))) ) return errors class _SiteOptimization(Feature): """Implementation for site optimization feature""" NAME = 'SITE_OPTIMIZATION' WP_PLUGIN_NAME = 'clsop' WP_FEATURE_NAME = 'accelerate-wp' @classmethod def redis_daemon_required(cls): return False @staticmethod def to_interface_name(): return 'accelerate_wp' @classmethod def _get_supported_php_versions(cls): return [ php_version.identifier for php_version in get_cached_php_installed_versions() if cls.is_php_supported(php_version) ] @classmethod def collect_docroot_issues(cls, doc_root_info, visible_features=None): """ Collects incompatibilities related to docroot (non-supported handler, etc) for site optimizatin module. """ issues = [] php_version = doc_root_info['php_version'] issues.extend( cls._check_php_version(php_version)) return issues @classmethod def _check_php_version(cls, php_version: PHP): """ Checks that website's php version is compatible with this feature. @returns list of incompatibilities related to php """ issues = [] if not cls.is_php_supported(php_version): supported_php_versions = cls._get_supported_php_versions() issues.append( CompatibilityIssue( header=_('PHP version is not supported'), fix_tip=_('Please, set or ask your system administrator to set one of the ' 'supported PHP version: %(compatible_versions)s for the domain.'), description=_('Non supported PHP version %(php_version)s currently is used.'), context=dict(php_version=php_version.identifier, compatible_versions=', '.join(supported_php_versions), docs_url=CL_DOC_USER_PLUGIN), unique_id=UniqueId.PHP_NOT_SUPPORTED, telemetry=dict(reason='PHP_VERSION_TOO_LOW') ) ) return issues @staticmethod def _requirements(): with open("/opt/cloudlinux-site-optimization-module/requirements.json", "r") as f: # { # "required_php_version": "7.0", # "required_wp_version": "5.4", # "incompatible_plugins": { # "w3-total-cache": "w3-total-cache/w3-total-cache.php", # "wp-super-cache": "wp-super-cache/wp-cache.php" # } # } return json.load(f) @classmethod def incompatible_plugins(cls): return set(cls._requirements()["incompatible_plugins"].keys()) @classmethod def is_php_supported(cls, php_version: PHP): """ Check if passed php version >= minimum PHP version supported by site optimization feature. """ return LooseVersion(php_version.version) >= LooseVersion(cls._requirements()["required_php_version"]) @classmethod def minimum_supported_wp_version(cls): return cls._requirements()["required_wp_version"] @classmethod def collect_wordpress_issues(cls, wordpress_info: Dict, docroot: str, module_is_enabled: bool): issues = [] abs_wp_path = Path(docroot).joinpath(wordpress_info["path"]) wp_content_dir = abs_wp_path.joinpath("wp-content") plugin_type = "advanced-cache" detected_advanced_cache_plugin = get_wp_cache_plugin(abs_wp_path, plugin_type) try: plugins_data = list_active_plugins(str(abs_wp_path)) except WposError as e: issues.append( MisconfigurationIssue( header=_('Unable to identify module compatibility'), description=_('Malformed output received from the following command: <br> $/opt/clwpos/wp-cli plugin list --status=active --format=json' '<br><br>The raw command output is:<br> \"%(wp_cli_response)s\"'), fix_tip=_('Please, check the received command output and ensure it returns a valid JSON.'), context=dict( wp_cli_response=str(e) ) ) ) found_plugins = set() else: found_plugins = {item["name"] for item in plugins_data} result = found_plugins & cls.incompatible_plugins() if detected_advanced_cache_plugin: result.add(detected_advanced_cache_plugin) result.discard("AccelerateWP") # if our WP Rocket module is enabled it's not conflicting plugin if module_is_enabled: result.discard("WP Rocket") issue = cls._get_wp_plugin_compatibility_issues(docroot, wordpress_info) if issue: issues.append(issue) # for more beautiful output if len(result) > 1: result.discard("Unknown") result = list(result) if len(result) == 1 and result[0] == 'Unknown': drop_in_file = wp_content_dir.joinpath(f'{plugin_type}.php') issues.append( CompatibilityIssue( header=_("Conflicting advanced cache plugin enabled"), description=_("Unknown advanced cache plugin is already enabled."), fix_tip=_('Remove the drop-in (%s) file from the WordPress ' 'instance because it conflicts with AccelerateWP.') % drop_in_file, context=dict(plugins=", ".join(result)), unique_id=UniqueId.PLUGIN_CONFLICT, telemetry=dict( reason='SOM_ALREADY_ENABLED', plugin=list(result) ) ) ) elif result: issues.append( CompatibilityIssue( header=_("Conflicting plugins are enabled"), description=_("Found conflicting plugins: %(plugins)s."), fix_tip=_("Deactivate and uninstall the conflicting plugin " "using the WordPress administration interface."), context=dict(plugins=", ".join(result)), unique_id=UniqueId.PLUGIN_CONFLICT, telemetry=dict( reason='SOM_ALREADY_ENABLED', plugin=list(result) ) ) ) return issues @classmethod def _get_issues_from_wp_plugin_status(cls, plugin_status): """ Get issue that relates to currently installed redis-cache plugin or None if everything is ok """ if plugin_status == PluginStatus.INACTIVE: return MisconfigurationIssue( header=_('"AccelerateWP" plugin is deactivated'), description=_('AccelerateWP feature is enabled, but the ' '"AccelerateWP" plugin is deactivated in Wordpress admin page. Caching does not work'), fix_tip=_( 'Activate the "AccelerateWP" plugin in the Wordpress admin page. ' 'As an alternative, rollback the feature and apply it again.') ) elif plugin_status == PluginStatus.UNINSTALLED: return MisconfigurationIssue( header=_('"AccelerateWP" plugin is not installed'), description=_( 'The "AccelerateWP" WordPress plugin is not installed. ' 'Caching does not work'), fix_tip=_('Rollback the feature and apply it again. ' 'Contact your administrator if the issue persists.') ) @classmethod def install(cls, abs_wp_path: str): """ Install cloudlinux-site-optimization plugin for user. :param abs_wp_path: absolute path to wp site :return: """ env = obtain_wp_cli_env(abs_wp_path) if is_plugin_installed(abs_wp_path, 'clsop', env): return res = wordpress(abs_wp_path, "plugin", "install", CLSOP_ZIP_PATH, env=env) if isinstance(res, WordpressError): raise WposError(message=res.message, context=res.context) @classmethod def enable(cls, abs_wp_path: str, *args, **kwargs): """ Enable cloudlinux-site-optimization plugin for user. :param abs_wp_path: absolute path to wp site :return: """ env = obtain_wp_cli_env(abs_wp_path) res = wordpress(abs_wp_path, "plugin", "activate", cls.WP_PLUGIN_NAME, env=env) if isinstance(res, WordpressError): raise WposError(message=res.message, context=res.context) @classmethod def disable(cls, abs_wp_path: str, **kwargs): """ Deactivate and delete cloudlinux-site-optimization plugin for user. :param abs_wp_path: absolute path to wp site :return: list of errors that occurred during command execution """ errors = [] env = obtain_wp_cli_env(abs_wp_path) if is_plugin_activated(abs_wp_path, cls.WP_PLUGIN_NAME, env): res = wordpress(abs_wp_path, "plugin", "deactivate", cls.WP_PLUGIN_NAME, env=env) if isinstance(res, WordpressError): errors.append(res) if not errors and is_plugin_installed(abs_wp_path, cls.WP_PLUGIN_NAME, env): # continue procedure further only if previous step succeeded res = wordpress(abs_wp_path, "plugin", "uninstall", cls.WP_PLUGIN_NAME, env=env) if isinstance(res, WordpressError): errors.append(res) return errors class AWPDependentFeature(_SiteOptimization): """ Introduces basic class for features, those strictly depend on AccelerateWP plugin -- e.g. are its sub-features """ MINIMUM_AWP_PLUGIN_VERSION: str NOT_SUPPORTED_ID: str @classmethod def included_optimization_features(cls): """ Dependent feature needs AccelerateWP plugin """ return [_SiteOptimization.optimization_feature(), cls.optimization_feature()] @classmethod def is_plugin_version_supported(cls, abs_path): """ Dependent feature carries its own MINIMUM_AWP_PLUGIN_VERSION supported """ actual_version = cls.get_plugin_version(wordpress_abs_path=abs_path, plugin_name=_SiteOptimization.WP_PLUGIN_NAME) min_wp_plugin_version, min_rpm_version = cls.MINIMUM_AWP_PLUGIN_VERSION.split('-', 1) actual_version_info = actual_version.split('-', 1) # for old versions, which does not have separator "-" rpm_version = '1.1-1' wp_plugin_version = actual_version_info[0] if len(actual_version_info) == 2: rpm_version = actual_version_info[1] return LooseVersion(wp_plugin_version) >= LooseVersion(min_wp_plugin_version) \ and LooseVersion(rpm_version) >= LooseVersion(min_rpm_version) @classmethod def collect_wordpress_issues(cls, wordpress_info: Dict, docroot: str, module_is_enabled: bool): """ Collects all incompatibilities of AccelerateWP (Site Optimization plugin) + dependent plugin specific """ abs_wp_path = os.path.join(docroot, wordpress_info['path']) is_accelerate_wp_plugin_activated = is_plugin_activated(abs_wp_path, _SiteOptimization.WP_PLUGIN_NAME) issues = super().collect_wordpress_issues(wordpress_info, docroot, module_is_enabled=is_accelerate_wp_plugin_activated) if is_accelerate_wp_plugin_activated and not cls.is_plugin_version_supported(abs_wp_path): issues.append( CompatibilityIssue( header=_("Incompatible version of AccelerateWP plugin"), description=_( "Version of AccelerateWP plugin must be higher than %s") % str(cls.MINIMUM_AWP_PLUGIN_VERSION), fix_tip=_('Update AccelerateWP plugin in Wordpress Admin page'), unique_id=cls.NOT_SUPPORTED_ID, telemetry=dict( reason=cls.NOT_SUPPORTED_ID ) ) ) if module_is_enabled: issue = cls._get_wp_plugin_compatibility_issues(docroot, wordpress_info) if issue: issues.append(issue) return issues @classmethod def install(cls, abs_wp_path: str): """ Install basic AccelerateWP plugin """ env = obtain_wp_cli_env(abs_wp_path) if is_plugin_installed(abs_wp_path, _SiteOptimization.WP_PLUGIN_NAME, env): return res = wordpress(abs_wp_path, "plugin", "install", CLSOP_ZIP_PATH, env=env) if isinstance(res, WordpressError): raise WposError(message=res.message, context=res.context) @classmethod def enable(cls, abs_wp_path: str, *args, **kwargs): # enable AccelerateWP itself first _SiteOptimization.enable(abs_wp_path) res = wordpress(abs_wp_path, _SiteOptimization.WP_FEATURE_NAME, cls.WP_FEATURE_NAME, "enable", *args) if isinstance(res, WordpressError): raise WposError(message=res.message, context=res.context) @classmethod def disable(cls, abs_wp_path: str, **kwargs): """ Disables a dependent optimization feature inside accelerate-wp plugin """ env = obtain_wp_cli_env(abs_wp_path) if is_plugin_activated(abs_wp_path, _SiteOptimization.WP_PLUGIN_NAME, env): res = wordpress(abs_wp_path, _SiteOptimization.WP_FEATURE_NAME, cls.WP_FEATURE_NAME, "disable") if isinstance(res, WordpressError): raise WposError(message=res.message, context=res.context) class _Cdn(AWPDependentFeature, BillableFeatureMixin): """Implementation for CDN feature""" NAME = 'CDN' WP_FEATURE_NAME = 'cdn' MINIMUM_AWP_PLUGIN_VERSION = '3.13.4-1.1-3' NOT_SUPPORTED_ID = UniqueId.AWP_NOT_SUPPORTS_CDN # this feature has specific license terms which user should # apply before he can use the feature HAS_LICENSE_TERMS = True LICENSE_TERMS_PATH = '/opt/clwpos/agreements/cdn' KNOWN_CDN_NS = { 'cloudflare': 'Cloudflare', '.fastly.net.': 'Fastly', '.akam.net': 'Akamai', 'awsdns': 'Amazon CloudFront(AWS)', '.impervadns.net': 'Imperva', 'azure': 'Azure CDN', 'google': 'Google Cloud CDN', 'sucuri': 'Sucuri', 'cdn77': 'CDN77', 'netlify': 'Netlify' } CDN_NS_PATTERN = re.compile(f"{'|'.join(KNOWN_CDN_NS.keys())}") @staticmethod def to_interface_name(): return 'cdn' @classmethod def incompatible_plugins(cls) -> set: """ CDN incompatible plugins. - cloudflare - litespeed-cache - speed-booster-pack - w3-total-cache - wp-fastest-cache - wp-super-cache are incompatible with CDN too, but they are already listed as SOM conflicting plugins """ cdn_incompatible_plugins = { 'autoptimize', 'bunnycdn', 'cdn-enabler', 'cloudimage', 'cloudinary-image-management-and-manipulation-in-the-cloud-cdn', 'nazy-load', 'optimole-wp', 'sirv', 'image-cdn', # TODO: AWP-435 'jetpack', 'nitropack', 'shift8-cdn', 'smartvideo', 'wp-cloudflare-page-cache', 'shapepress-dsgvo', 'amazon-s3-and-cloudfront', 'wp-cdn-yes', 'aws-cdn-by-wpadmin' } return cdn_incompatible_plugins.union(super().incompatible_plugins()) @classmethod def _dig_ns(cls, domain_name: str) -> str: """Dig domain's NS""" dig_util = '/usr/bin/dig' if not os.path.isfile(dig_util): # dig is not installed, assume no NS detected return str() dig_cmd = [ dig_util, 'ns', domain_name, '+short' ] try: dig_result = subprocess.run(dig_cmd, capture_output=True, text=True) if dig_result.returncode: # dig command failed with returncode, assume no NS detected cls._logger.exception("dig domain failed with exitcode %s: \n" "stdout=%s\n" "stderr=%s", dig_result.returncode, dig_result.stdout, dig_result.stderr) return str() return dig_result.stdout.strip() except (OSError, IOError, ) as e: # subprocess failed to execute command, assume no NS detected cls._logger.exception( "Failed to dig domain, command crushed with: %s", e) return str() @classmethod def collect_docroot_issues(cls, doc_root_info, visible_features=None): """ Collects incompatibilities related to docroot for CDN module: - site optimization inherited - CDN already enabled by NS """ issues = _SiteOptimization.collect_docroot_issues( doc_root_info, visible_features) if ns_issue := cls._find_ns_issue(doc_root_info['domains'][0]): issues.append(ns_issue) return issues @classmethod def _find_ns_issue(cls, primary_domain) -> Optional[CompatibilityIssue]: """ Checks whether domain already has CDN installed or not. We do that by mathing list of common DNS provider hostnames. """ ns_cdn_detected = cls.CDN_NS_PATTERN.search( cls._dig_ns(primary_domain)) if ns_cdn_detected is not None: return CompatibilityIssue( header=_("CDN is already enabled"), description=_("Already enabled CDN found: %(cdn)s."), fix_tip=_("Deactivate the enabled CDN using " "your service provider instructions."), context=dict(cdn=cls.KNOWN_CDN_NS[ns_cdn_detected.group(0)]), unique_id=UniqueId.NS_CDN_CONFLICT, telemetry=dict( reason='CDN_ALREADY_ENABLED_BY_NS', plugin=cls.KNOWN_CDN_NS[ns_cdn_detected.group(0)] ) ) def enable(self, abs_wp_path: str, *args, **kwargs): domain = f'{PULLZONE_DOMAIN_PROTOCOL}{kwargs.get("domain")}' website = f'/{kwargs.get("website")}' skip_checkers = kwargs.get("skip_checkers", False) get_pullzone_command = [SMART_ADVISE_USER_UTILITY, 'awp-cdn-get-pullzone', '--domain', domain, '--website', website] try: output = run_in_cagefs_if_needed(get_pullzone_command, check=True) except subprocess.CalledProcessError as error: self._logger.exception("Error during obtaining pullzone: \n" "stdout=%s\n" "stderr=%s", error.stdout, error.stderr) raise WposError('Unable to obtain pullzone required for CDN optimization feature') pullzone_data = json.loads(output.stdout)['data'] additional_args = list() if skip_checkers: additional_args.append('--skip-check') super().enable(abs_wp_path, '--account_id=%s' % pullzone_data['account_id'], '--cdn_url=%s' % pullzone_data['cdn_url'], '--api_key=%s' % self._get_or_create_unique_identifier(), *additional_args) @classmethod def disable(cls, abs_wp_path: str, **kwargs): """ Disables cdn feature inside accelerate wp plugin """ domain = f'{PULLZONE_DOMAIN_PROTOCOL}{kwargs.get("domain")}' website = f'/{kwargs.get("website")}' remove_pullzone_command = [SMART_ADVISE_USER_UTILITY, 'awp-cdn-remove-pullzone', '--domain', domain, '--website', website] try: run_in_cagefs_if_needed(remove_pullzone_command, check=True) except subprocess.CalledProcessError as error: cls._logger.exception("Error during removing pullzone: \n" "stdout=%s\n" "stderr=%s", error.stdout, error.stderr) raise WposError('Unable to remove pullzone') super().disable(abs_wp_path) class _ImageOptimization(AWPDependentFeature, BillableFeatureMixin): """Implementation for image optimization feature""" NAME = 'IMAGE_OPTIMIZATION' WP_FEATURE_NAME = 'image_optimization' MINIMUM_AWP_PLUGIN_VERSION = '3.12.6.1-1.1-1' NOT_SUPPORTED_ID = UniqueId.AWP_NOT_SUPPORTS_IMAGE_OPTIMIZATION @staticmethod def to_interface_name(): return 'image_optimization' @classmethod def incompatible_plugins(cls) -> set: """ Image Optimization incompatible plugins. """ img_opt_incompatible_plugins = { 'ewww-image-optimizer', 'shortpixel-image-optimiser', 'shortpixel-adaptive-images' 'imagify', 'optimole-wp', 'wp-smushit', 'resmushit-image-optimizer', 'megaoptim-image-optimizer', 'kraken-image-optimizer', 'tiny-compress-images', 'wp-compress-image-optimizer', 'optimus', 'imsanity', # TODO: AWP-435 # jetpack skipped for now, since more smart detection required } return img_opt_incompatible_plugins.union(super().incompatible_plugins()) def enable(self, abs_wp_path: str, *args, **kwargs): super().enable(abs_wp_path, '--unique_id=%s' % self._get_or_create_unique_identifier()) class _CriticalCSS(AWPDependentFeature, BillableFeatureMixin): """Implementation for Critical Path CSS feature""" NAME = 'CPCSS' WP_FEATURE_NAME = 'cpcss' MINIMUM_AWP_PLUGIN_VERSION = '3.12.6.1-1.1-1' NOT_SUPPORTED_ID = UniqueId.AWP_NOT_SUPPORTS_CPCSS @staticmethod def to_interface_name(): return 'critical_css' @classmethod def optimization_feature(cls): return cls(cls.to_interface_name()) def enable(self, abs_wp_path: str, *args, **kwargs): super().enable(abs_wp_path, '--unique_id=%s' % self._get_or_create_unique_identifier()) OBJECT_CACHE_FEATURE = Feature("object_cache") SITE_OPTIMIZATION_FEATURE = Feature("site_optimization") CDN_FEATURE = Feature('cdn') CRITICAL_CSS_FEATURE = Feature("critical_css") IMAGE_OPTIMIZATION_FEATURE = Feature('image_optimization') ALL_OPTIMIZATION_FEATURES = [ OBJECT_CACHE_FEATURE, SITE_OPTIMIZATION_FEATURE, CDN_FEATURE, CRITICAL_CSS_FEATURE, IMAGE_OPTIMIZATION_FEATURE ]