D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clsummary
/
Filename :
cl_summary_utils.py
back
Copy
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import absolute_import, division from typing import Optional, Dict, AnyStr, Union, List from clcommon.lib.consts import DEFAULT_JWT_ES_TOKEN_PATH from clcommon.utils import process_is_running, get_cl_version from jwt import decode, exceptions import configparser import io import os import json import fcntl import struct import time from clcommon.clcaptain import mkdir as mkdir_p from cldetectlib import CL_CONFIG_FILE from secureio import write_file_via_tempfile from clconfig.cagefs_statistics_config import check_cagefs_initialized from clcommon.lib.whmapi_lib import WhmApiRequest, WhmApiError def dummy_none_function(*a, **kw): return None try: from clselect.clselectctl import interpreter_versions_short_summary from clselector.clpassenger_detectlib import is_clpassenger_active except ImportError: interpreter_versions_short_summary = dummy_none_function is_clpassenger_active = dummy_none_function _CL_STATISTICS_SECTION = "license_check" _CL_STATISTICS_COLLECT_STATE_OPTION = "cl_statistics_enabled" _CL_STATISTICS_COLLECT_RPM_STATE_OPTION = "cl_statistics_rpm_enabled" _CL_STATISTICS_DIR = '/var/lve' _CL_STATISTICS_SEND_STATUS_FILE = os.path.join(_CL_STATISTICS_DIR, 'summary_status.json') _CL_STATISTICS_LOCK_PATH = '/var/run/cloudlinux_summary.send.lock' _CL_STATISTICS_LOCK_FILE = None CL_PLUS_SENDER_FILE_PATH = '/usr/share/cloudlinux/cl_plus/clplus_sender.py' ALT_PYTHON_VIRTUALENV_BIN = '/opt/cloudlinux/venv/bin/virtualenv' class SummaryStatus(object): """ Status of both, collecting and sending statistics If process still collects statistics -> IN_PROGRESS If statistics collected and sent correctly -> SUCCESS If any error during collecting or sending -> FAILED """ SUCCESS = 'success' IN_PROGRESS = 'in_progress' FAILED = 'failed' def is_virtualenv_installed(): """ Checks is virtualenv installed :return: True/False - installed or not """ return os.path.exists(ALT_PYTHON_VIRTUALENV_BIN) def is_locked(lock_file): """ Check if file is locked by another process without acquiring lock. IMPORTANT! This function should NOT be used to check lock acquired by the same process that executes the is_locked() function. For example, when process executes fcntl.lockf(LOCK_FILE), and then the same process executes is_locked(LOCK_FILE), the is_locked(LOCK_FILE) call returns False. Use is_locked() function to check lock acquired by another process only. :param lock_file: file to check lock on :type lock_file: file object or descriptor """ lock_data = struct.pack("hhllhh", fcntl.F_WRLCK, 0, 0, 0, 0, 0) try: lock_query = fcntl.fcntl(lock_file, fcntl.F_GETLK, lock_data) lock_status = struct.unpack("hhllhh", lock_query)[0] except (OSError, IOError): # should never happen return False return lock_status != fcntl.F_UNLCK def is_sending_process_running(acquire_lock=False): """ Check if processes collecting stats are running already (with --send option in command line) :param acquire_lock: True = acquire lock when possible :type acquire_lock: bool :return bool: True = Processes are running """ global _CL_STATISTICS_LOCK_FILE _CL_STATISTICS_LOCK_FILE = open(_CL_STATISTICS_LOCK_PATH, 'w') if not acquire_lock: return is_locked(_CL_STATISTICS_LOCK_FILE) try: fcntl.lockf(_CL_STATISTICS_LOCK_FILE, fcntl.LOCK_EX | fcntl.LOCK_NB) except (OSError, IOError): return True return False def _get_config(): """ Retrieves ConfigParser object for /etc/sysconfig/cloudlinux file :return: """ config = configparser.ConfigParser(interpolation=None, strict=True) config.optionxform = str # make config case sensitive config.read(CL_CONFIG_FILE) return config def _write_config(config): """ write config to /etc/sysconfig/cloudlinux file :param config: configParser object to write return boolean: True|False """ try: string_fp = io.StringIO() config.write(string_fp) content = string_fp.getvalue() write_file_via_tempfile(content, CL_CONFIG_FILE, 0o644, prefix='cloudlinux_') except (OSError, IOError): return False return True def _get_config_value(parameter: str, default: bool = True) -> bool: """ Retrieves parameter's value from /etc/sysconfig/cloudlinux file, _CL_STATISTICS_SECTION section """ config = _get_config() res = default try: res = config.getboolean(_CL_STATISTICS_SECTION, parameter) except (configparser.NoSectionError, configparser.NoOptionError, ValueError): # Treat absent/missing value as default pass return res def _set_config_value(parameter: str, value: bool) -> None: """ Sets parameter's value to /etc/sysconfig/cloudlinux file, _CL_STATISTICS_SECTION section """ config = _get_config() config.set(_CL_STATISTICS_SECTION, parameter, str(int(value))) _write_config(config) def is_statistic_enabled() -> bool: """ Retrieves statistic collection status from /etc/sysconfig/cloudlinux file :return: True/False - enabled/disabled """ return _get_config_value(_CL_STATISTICS_COLLECT_STATE_OPTION) def is_statistic_rpm_enabled() -> bool: """ Retrieves rpm statistic collection status from /etc/sysconfig/cloudlinux file :return: True/False - enabled/disabled """ return _get_config_value(_CL_STATISTICS_COLLECT_RPM_STATE_OPTION) def set_statistic_collection_enabled(is_enabled: bool) -> None: """ Set statistic collection status to /etc/sysconfig/cloudlinux file :param is_enabled: True/False - enabled/disabled """ _set_config_value(_CL_STATISTICS_COLLECT_STATE_OPTION, is_enabled) def set_statistic_rpm_collection_enabled(is_enabled: bool) -> None: """ Set rpm statistic collection status to /etc/sysconfig/cloudlinux file :param is_enabled: True/False - enabled/disabled """ _set_config_value(_CL_STATISTICS_COLLECT_RPM_STATE_OPTION, is_enabled) def write_statistics_send_status_to_file(status_dict): """ Writes statistics send status to file /var/lve/summary_status.json :param status_dict: status dictionary for write to file :return: """ try: if not os.path.exists(_CL_STATISTICS_DIR): mkdir_p(_CL_STATISTICS_DIR) content = json.dumps(status_dict) # Write to file readable only for root write_file_via_tempfile(content, _CL_STATISTICS_SEND_STATUS_FILE, 0o600, prefix='cloudlinux_') except (OSError, IOError): pass def get_statistics_send_status_from_file(): """ Retrieves statistics send status from file /var/lve/summary_status.json :return: Dictionary with last send status. None if any error """ status_dict = None try: with open(_CL_STATISTICS_SEND_STATUS_FILE) as f: s_content = f.read() status_dict = json.loads(s_content) if status_dict['result'] == SummaryStatus.IN_PROGRESS \ and not is_sending_process_running(): # something went wrong during collection status_dict['result'] = SummaryStatus.FAILED status_dict['reason'] = 'Collecting statistics was failed. Error ' \ 'report has been sent to developers and will be fixed soon' except (OSError, IOError, ValueError, AttributeError, TypeError): pass return status_dict def installed_interpreters_list(interpreter): """ Returns list of installed interpreters :param interpreter: str - name of interpreter :rtype: List of InterpreterSummary """ return [i for i in interpreter_versions_short_summary(interpreter) if i.installed] def is_python_selector_installed(): """ Checks that python selector is installed Installed if: - ea-apache24-mod-alt-passenger or alt-mod-passenger is installed - alt-python-virtualenv is installed :rtype: bool """ return is_clpassenger_active() and is_virtualenv_installed() def is_ruby_selector_installed(): """ Checks that ruby selector is installed Installed if: - ea-apache24-mod-alt-passenger or alt-mod-passenger is installed - alt-python-virtualenv is installed :rtype: bool """ return is_clpassenger_active() and is_virtualenv_installed() def is_nodejs_selector_installed(): """ Checks that nodejs selector is installed Installed if: - ea-apache24-mod-alt-passenger or alt-mod-passenger is installed - At least one version is installed :rtype: bool """ return is_clpassenger_active() and bool(installed_interpreters_list('nodejs')) def is_php_selector_installed(): """ Checks that php selector is installed Installed if: - CageFS is initialized :rtype: bool """ return bool(check_cagefs_initialized()) def get_packages_with_lve_extensions(): """ Gets packages with set lve limits via extension """ try: result = WhmApiRequest('listpkgs').call() except WhmApiError: return [] lve_extensions_packages = [item['name'] for item in result['pkg'] if '_PACKAGE_EXTENSIONS' in item and item['_PACKAGE_EXTENSIONS'] == 'lve'] return lve_extensions_packages def get_client_data_from_jwt_token(check_expiration=True) -> Optional[Dict[AnyStr, Union[AnyStr, bool]]]: """ Gets (if any) fields cl_plus and client_id from jwt token :return: decoded jwt_token value, None if error jwt_token: result of the successful decoding """ try: with open(DEFAULT_JWT_ES_TOKEN_PATH, mode='rb') as file: file_content = file.read().strip() except (OSError, IOError): return None # JWT read success try: jwt_token = decode(file_content, algorithms=['HS256'], options={'require_exp': True, "verify_exp": check_expiration, "verify_iss": True, 'verify_signature': False}, issuer='CloudLinux') return jwt_token # JWT format error except exceptions.PyJWTError: return None def is_active_cloudlinux_license(token_data): """ Checks whether license ts is expired """ if not token_data: return None if not token_data.get('exp'): return None return int(time.time()) < int(token_data.get('exp')) def get_cl_plus_sender_status() -> Optional[AnyStr]: """ Retrieves data from status of cl_plus_sender service :return: status of service, Optional[AnyStr] - 'active' - 'inactive' """ try: result = process_is_running(CL_PLUS_SENDER_FILE_PATH, False) except FileNotFoundError: result = False return 'active' if result else 'inactive'