D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clwpos
/
Filename :
daemon_redis_lib.py
back
Copy
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # # Redis manipulation library for Cloudlinux AccelerateWP daemon # pylint: disable=no-absolute-import import json import logging import pwd import os import traceback import subprocess import signal import psutil import time from logging import Logger from typing import List, Optional, Tuple from clcommon.clpwd import drop_privileges from clcommon.utils import run_command, ExternalProgramFailed, is_user_present, demote from clcommon.cpapi import cpusers from clcommon.const import Feature from clcommon.cpapi import is_panel_feature_supported from clcommon.clpwd import ClPwd from clwpos.constants import REDIS_SERVER_BIN_FILE from clwpos.cl_wpos_exceptions import WposError from clwpos.utils import USER_WPOS_DIR, is_run_under_user from clcommon.cpapi.cpapiexceptions import NoPackage from clwpos import gettext as _ logger = logging.getLogger(__name__) _REDIS_CLI_BIN_FILE = '/opt/alt/redis/bin/redis-cli' def _get_pids_for_file(file_path: str) -> List[int]: """ Retrieves list of PID list processes, which uses file (using fuser utility) This can find any process (for example php), not only redis service process :param file_path: Filename to check :return: PID list """ try: # # /usr/sbin/fuser /home/cltest1/.clwpos/redis.sock # /home/cltest1/.clwpos/redis.sock: 55882 [105766 251507] std_out = run_command(['/sbin/fuser', file_path], return_full_output=False) lines_list = std_out.split('\n') # Get PID list from output s_pid_list = lines_list[0].split(':')[1].strip() pid_list = [] for s_pid in s_pid_list.split(' '): try: pid_list.append(int(s_pid.strip())) except ValueError: pass return pid_list except (ExternalProgramFailed, IndexError): pass return [] def _get_user_pids(username: str) -> List[int]: """ Update PID list in cache for user using /bin/ps utility :param: username: Username to scan :return: None """ # /bin/ps -o"pid" -u cltest1 # PID # 1608661 # 1638657 # ...... # Get user's PID list try: std_out = run_command(['/bin/ps', '-o', 'pid', '-u', username], return_full_output=False) except ExternalProgramFailed: return [] lines_list = std_out.split('\n') if len(lines_list) < 2: return [] # Remove header line user_pid_list = [] lines_list = lines_list[1:] for line in lines_list: line = line.strip() if line: try: user_pid_list.append(int(line.strip())) except ValueError: pass return user_pid_list def _get_user_redis_pids(username: str, home_dir: str) -> List[int]: """ Get redis PID list for user :param username: user name :param home_dir: User's homedir :return: PID list or [] if user has no redis """ redis_socket_file = os.path.join(home_dir, USER_WPOS_DIR, 'redis.sock') pid_list_sock = _get_pids_for_file(redis_socket_file) user_pids = _get_user_pids(username) pid_list = [] for pid in pid_list_sock: if pid in user_pids: pid_list.append(pid) return pid_list def kill_process_by_pid(_logger: Logger, pid: int): """ Kill process by pid :param _logger: Logger to log errors :param pid: Process pid to kill """ if not is_run_under_user(): raise WposError("Internal error! Trying to kill process with root privileges") try: os.kill(pid, signal.SIGTERM) # 15 time.sleep(5) try: os.kill(pid, signal.SIGKILL) # 9 except OSError: pass except OSError as e: _logger.warning("Can't kill redis process, pid %s; error: %s", pid, str(e)) def _kill_all_redises_for_user(logger: Logger, username: str): """ Kill all user's redice processes :param logger: Logger to log errors :param username: User name """ if not is_user_present(username): return user_pwd = pwd.getpwnam(username) redis_pid_list = _get_user_redis_pids(user_pwd.pw_name, user_pwd.pw_dir) with drop_privileges(username): for redis_pid in redis_pid_list: kill_process_by_pid(logger, redis_pid) def kill_all_users_redises(logger: Logger): """ Find and kill lost redices for all panel users :param logger: Daemon's logger """ try: users = cpusers() except (OSError, IOError, IndexError, NoPackage) as e: logger.warning("Can't get user list from panel: %s", str(e)) return for username in users: _kill_all_redises_for_user(logger, username) def is_user_redis_alive(user_id: int) -> Tuple[bool, bool, dict]: """ Check user's redis is alive :param user_id: uid to check sockets return True/False - redis alive/not alive :return: Tuple: (redis is working/not working, is user present, errors dict) error - (False, False {"result": "error", "context": "..."}) """ # TODO: Refactor this in https://cloudlinux.atlassian.net/browse/LU-2506 # # /opt/alt/redis/bin/redis-cli -s /home/cltest1/.clwpos/redis.sock ping # Could not connect to Redis at /home/cltest1/.clwpos/redis.sock: No such file or directory # # echo $? # 1 # # /opt/alt/redis/bin/redis-cli -s /home/cltest1/.clwpos/redis.sock ping # PONG # # echo $? # 0 try: user_pwd = pwd.getpwuid(user_id) username = user_pwd.pw_name except KeyError: logger.debug("Redis check error for user %s. No user with such uid", str(user_id)) return False, False, {"result": _("Redis check error for user with uid %(uid)s. No such user"), "context": {"uid": str(user_id)}} try: redis_socket_path = os.path.join(user_pwd.pw_dir, USER_WPOS_DIR, 'redis.sock') if not is_panel_feature_supported(Feature.CAGEFS): # CL Solo cmd = [_REDIS_CLI_BIN_FILE, '-s', redis_socket_path, 'ping'] proc = subprocess.Popen(cmd, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL, preexec_fn=demote(user_pwd.pw_uid, user_pwd.pw_gid), cwd=user_pwd.pw_dir) else: proc = subprocess.Popen(['/sbin/cagefs_enter_user', '--no-fork', username, _REDIS_CLI_BIN_FILE, '-s', redis_socket_path, 'ping'], cwd=user_pwd.pw_dir, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) stdout_data, stderr_data = proc.communicate() if proc.returncode != 0: # Process start error return False, True, {"result": _("Redis CLI start error %(error)s for user %(user)s"), "context": {"error": f"Error happened while checking redis " f"for user '%(user)s'. stdout: '{stdout_data}'; " f"stderr: '{stderr_data}'", "user": username}} return proc.returncode == 0, True, {"result": "success"} except (ExternalProgramFailed, ClPwd.NoSuchUserException, IOError, OSError) as e: str_exc = traceback.format_exc() logger.debug("Redis check error for user %s. Error is: %s", username, str_exc) return False, True, {"result": _("Redis CLI start error %(error)s for user %(user)s"), "context": {"error": str(e), "user": username}} def _get_redis_pid_from_pid_file_with_wait(redis_pid_filename: str) -> Optional[int]: """ Get redis process PID from redis pid file. Wait up to 10 seconds :param redis_pid_filename: Redis PID filename :return: Redis PID or None on error (pid file absent/invalid or redis not started) """ for i in range(100): try: with open(redis_pid_filename, 'r') as f: pid = int(f.read().strip()) os.kill(pid, 0) return pid except (OSError, IOError, ValueError): # Error, PID file absent/invalid or redis still absent time.sleep(0.1) # Error, redis not started or pid file read/parse error return None def reload_redis_for_user_thread(username: str, old_redis_pid: Optional[int], force_reload: str = 'no') -> Tuple[Optional[int], dict]: """ Reloads redis for supplied user via helper script. Should be trun in thread :param username: Username to setup redis :param old_redis_pid: Old Redis PID for kill :param force_reload: reload redis w/o checks :return: Tuple: If redis was started for user - (PID of new redis process, {"result": "success"}) else - redis was not started - (None, {"result": "error", "context": ""}) """ try: user_pwd = pwd.getpwnam(username) except (KeyError, OSError, ): logger.debug("Can't reload redis for user '%s'. User not found.", username) return None, {"result": _("Can't reload redis for user '%(user)s'. User not found."), "context": {"user": username}} logger.info('Calling redis_reloader with parameters: username: %s, old redis pid: %s, force reload: %s', username, str(old_redis_pid), force_reload) try: # Run redis_reloader_script proc = subprocess.Popen(['/usr/share/cloudlinux/wpos/redis_reloader.py', username, str(old_redis_pid), force_reload], shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) stdout, stderr = proc.communicate() except (OSError, IOError,) as e: logger.debug("Reload redis error for user '%s'. Error is %s", username, str(e)) return None, {"result": _("Reload redis error for user '%(user)s'. Error is %(msg)s"), "context": {"user": username, "msg": str(e)}} try: reload_result_dict = json.loads(stdout) if reload_result_dict['result'] != 'success': return None, reload_result_dict except (KeyError, json.JSONDecodeError, TypeError): return None, {"result": _("Reload redis for user '%(user)s' decode error: %(error_msg)s"), "context": {"user": username, "error_msg": stdout}} # Redis was started, get PID pidfile_path = os.path.join(user_pwd.pw_dir, USER_WPOS_DIR, 'redis.pid') redis_pid = None if reload_result_dict.get('redis_enabled', True): redis_pid = _get_redis_pid_from_pid_file_with_wait(pidfile_path) return redis_pid, {"result": "success"} def parse_redises() -> List[Tuple[int, int]]: """ Get redis process by parsing psutil.process_iter Return list of tuples: [(user_uid, process_pid)] """ res = [] for proc in psutil.process_iter(['name']): if proc.info['name'] == 'redis-server': res.append(_validate_redis_proc(proc)) return list(filter(None, res)) def _validate_redis_proc(p: psutil.Process) -> Optional[Tuple[int, int]]: """ Ensure that redis process is ours: 1. Right binary (alt-redis) 2. Right socket """ redis_bin = REDIS_SERVER_BIN_FILE uid = p.uids().real pw = pwd.getpwuid(uid) user_home = pw.pw_dir sock_abspath = f'unixsocket:{user_home}/.clwpos/redis.sock' cmd = ' '.join(p.cmdline()) if cmd.startswith(redis_bin) and sock_abspath in cmd: return uid, p.pid return None