D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clwpos
/
Filename :
daemon_subscription_handler.py
back
Copy
import logging import os import pwd import socket import subprocess import time from dataclasses import dataclass from typing import Optional, Dict, List from clcommon.clpwd import drop_user_privileges from clwpos import socket_utils from clwpos.utils import run_in_cagefs_if_needed @dataclass class EnableFeatureTask: uid: int # some arguments that we need when # we enable module manually domain: str wp_path: str # the features list that we would like to enable feature: list # if true, appends --ignore-errors to enable command ignore_errors: bool # time when we give up and # think that waiting is unsuccessful timeout_timestamp: int # optional argument which indicates that # we are applying advice advice_id: Optional[str] = None # pid of the process which took the task in work pid: Optional[int] = None class PendingSubscriptionWatcher: """ Listen to set-suite and user events in order to automatically enable module when suite is purchased. """ # max time that we wait for the feature to be allowed _FEATURE_STATUS_CHECK_TIMEOUT = 180 def __init__(self, logger=None): # list of pending tasks to automatically enable # module when suite is allowed self._pending_enable_tasks: Dict[int, List[EnableFeatureTask]] = dict() self._logger = logger or logging.getLogger(__name__) def _cleanup_pending_tasks(self): """ Cleanup list of pending tasks by removing outdated. """ for uid, pending_tasks in list(self._pending_enable_tasks.items()): for pending_task in pending_tasks: if pending_task.timeout_timestamp >= time.time(): continue if pending_task.pid: try: os.kill(pending_task.pid, 0) continue except OSError: # process is dead pass pending_tasks.remove(pending_task) if not pending_tasks: self._pending_enable_tasks.pop(uid) self._logger.info('Cleanup of pending tasks. Still active tasks %s', self._pending_enable_tasks) def _run_and_log_results(self, args): self._logger.info('Running %s', ' '.join(args)) try: output = run_in_cagefs_if_needed(args, check=True) self._logger.info('Command succeded with output: \n`%s`', output) except subprocess.CalledProcessError as e: self._logger.exception('Unable to activate feature in background. ' 'Stdout is %s. Stderr is %s', e.stdout, e.stderr) except Exception: self._logger.exception('Unable to activate feature in background') self._logger.debug('Finished %s', ' '.join(args)) def _run_enable_command(self, task): if task.advice_id: self._run_and_log_results([ '/opt/alt/php-xray/cl-smart-advice-user', 'apply', '--advice_id', str(task.advice_id) ]) elif task.domain: for f in task.feature: self._logger.info('activate feature in background: feature=%s', f) cmd = [ 'cloudlinux-awp-user', 'enable', '--feature', f, '--domain', task.domain, '--wp-path', task.wp_path ] if task.ignore_errors: cmd.append('--ignore-errors') self._run_and_log_results(cmd) else: logging.info('Task does not have any advice or domain specified, skipping') def suite_allowed_callback(self, client_socket_obj: socket.socket): self._cleanup_pending_tasks() for uid, tasks in list(self._pending_enable_tasks.items()): for task in tasks: fp = os.fork() if fp: self._logger.info('background process forked: pid=%d', fp) task.pid = fp os.waitpid(fp, 0) self._logger.info('background process finished: pid=%d', fp) del self._pending_enable_tasks[uid] else: from clwpos.feature_suites import get_allowed_modules # drop privileges forever.. or at least till the end of process drop_user_privileges(pwd.getpwuid(task.uid).pw_name, effective_or_real=True, set_env=True) allowed_features = get_allowed_modules(task.uid) for f in task.feature: if f not in allowed_features: self._logger.info('unable to activate feature in background: feature=%s is not allowed', f) continue return self._run_enable_command(task) response: dict = { "result": "success" } socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, response) def add_pending_upgrade_task(self, client_socket_obj: socket.socket, user_request: dict, uid: int): self._cleanup_pending_tasks() if uid not in self._pending_enable_tasks: self._pending_enable_tasks[uid] = [] self._pending_enable_tasks[uid].append(EnableFeatureTask( uid=uid, domain=user_request['domain'], wp_path=user_request['wp_path'], feature=user_request['feature'].split(','), ignore_errors=user_request['ignore_errors'], advice_id=user_request.get('advice_id'), timeout_timestamp=int(time.time() + self._FEATURE_STATUS_CHECK_TIMEOUT) )) self._logger.info('Successfully added pending upgrade subscription task %s', self._pending_enable_tasks[uid]) response: dict = { "result": "success" } socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, response) def get_upgrade_task_status(self, client_socket_obj: socket.socket, uid: int, feature: str): self._cleanup_pending_tasks() pending_tasks = self._pending_enable_tasks.get(uid, []) response: dict = { "result": "success", "pending": bool(any(feature in pending_task.feature for pending_task in pending_tasks)) } socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, response)