D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
thread-self
/
root
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clcommon
/
lib
/
Filename :
cledition.py
back
Copy
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # # pylint: disable=no-absolute-import import configparser import os import subprocess import warnings from enum import Enum from typing import AnyStr, Tuple, Optional from jwt import exceptions from clcommon.lib.consts import CLN_JWT_TOKEN_PATH, CL_EDITION_FILE_FOR_USERS from clcommon.lib.jwt_token import read_jwt, decode_jwt, jwt_token_check from clcommon.clexception import FormattedException from clcommon.clcagefs import in_cagefs CL_SOLO_EDITION_FILE_MARKER = '/etc/cloudlinux-edition-solo' CL_ADMIN_EDITION_FILE_MARKER = '/etc/cloudlinux-edition-admin' SHARED_PRO_EDITION_HUMAN_READABLE = 'CloudLinux OS Shared Pro' SHARED_EDITION_HUMAN_READABLE = 'CloudLinux OS Shared' SOLO_EDITION_HUMAN_READABLE = 'CloudLinux OS Solo' ADMIN_EDITION_HUMAN_READABLE = 'CloudLinux OS Admin' class SupportedEditions(Enum): """ Keeps supported CloudLinux editions """ SOLO = 'solo' SHARED = 'shared' SHARED_PRO = 'shared_pro' ADMIN = 'admin' class CLEditionDetectionError(FormattedException): def __init__(self, message, **context): FormattedException.__init__(self, { 'message': message, 'context': context }) HUMAN_READABLE_TOKEN_EDITION_PAIRS = { SOLO_EDITION_HUMAN_READABLE: SupportedEditions.SOLO.value, SHARED_PRO_EDITION_HUMAN_READABLE: SupportedEditions.SHARED_PRO.value, SHARED_EDITION_HUMAN_READABLE: SupportedEditions.SHARED.value, ADMIN_EDITION_HUMAN_READABLE: SupportedEditions.ADMIN.value } class CLEditions: @staticmethod def get_from_jwt(token=None, verify_exp=True): """ Note: be careful when modifying this method. Passing token in is used in X-Ray, ask @dkavchuk or someone else from C-Projects team for details :param token: :param verify_exp: :return: """ if token is None: token = read_jwt(CLN_JWT_TOKEN_PATH) try: jwt = decode_jwt(token, verify_exp=verify_exp) except exceptions.PyJWTError as e: raise CLEditionDetectionError(f'Unable to detect edition from jwt token: {CLN_JWT_TOKEN_PATH}. ' f'Please, make sure it is not broken, error: {e}') try: return jwt['edition'] except KeyError: # fallback for old format of tokens cl_plus = jwt.get('cl_plus') if cl_plus is None: raise CLEditionDetectionError( f'Unable to detect edition from jwt token: {CLN_JWT_TOKEN_PATH}. ' f'Please, make sure it is not broken, error: not enough fields for detection') return SupportedEditions.SHARED_PRO.value if cl_plus else SupportedEditions.SHARED.value @staticmethod def is_package_installed(package_name): process = subprocess.Popen(['rpm', '-q', package_name], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) out, err = process.communicate() if process.returncode != 0 and 'is not installed' in out: return False elif process.returncode != 0: raise CLEditionDetectionError(f'Unable to check is package {package_name} is installed, ' f'reason: {out}, {err}') return True @classmethod def get_cl_edition(cls, skip_jwt_check=False, skip_marker_check=False, raw_jwt=None, verify_exp=True): """ 1. Try to detect edition from jwt token if edition field is in token -> return edition if edition field is not present -> recheck via cl_plus flag if token is broken -> raise error about it 2. Try to detect from file if edition is in file -> return edition if file is broken -> raise about it if file is missed -> check if meta package is present Detection from file may be switched off using skip_marker_check In case there is no token with correct edition, no file we consider edition as regular CL. Note: be careful when modifying this method. It is used in X-Ray, ask @dkavchuk or someone else from C-Projects team for details """ # skipping both jwt and file checks is not allowed if skip_jwt_check and skip_marker_check: raise CLEditionDetectionError( 'Unable to detect edition: neither jwt token check, no file marker check enabled') if not skip_jwt_check and os.path.isfile(CLN_JWT_TOKEN_PATH): try: edition = cls.get_from_jwt(token=raw_jwt, verify_exp=verify_exp) except PermissionError: edition = user_cl_edition() if edition: return edition # In case if user in cagefs. if in_cagefs(): return user_cl_edition() # if fallback to file is applicable if not skip_marker_check: # jwt has no 'edition' field -> ensure it is really solo via file if os.path.isfile(CL_SOLO_EDITION_FILE_MARKER): return SupportedEditions.SOLO.value elif os.path.isfile(CL_ADMIN_EDITION_FILE_MARKER): return SupportedEditions.ADMIN.value return user_cl_edition() or SupportedEditions.SHARED.value def is_cl_solo_edition(skip_jwt_check=True, skip_marker_check=False, verify_exp=True): """ Allow skip_jwt_check ONLY if it not critical when license is not valid Use skip_marker_check=True when validity of license is critical and fallback to file is not applicable Note: be careful when modifying this method. It is used in X-Ray and SSA, ask @dkavchuk or someone else from C-Projects team for details """ edition = CLEditions.get_cl_edition( skip_jwt_check=skip_jwt_check, skip_marker_check=skip_marker_check, verify_exp=verify_exp ) return edition is not None and edition == SupportedEditions.SOLO.value def is_cl_shared_edition(skip_jwt_check=False, skip_marker_check=False, verify_exp=True): """ Allow skip_jwt_check ONLY if it not critical when license is not valid Use skip_marker_check=True when validity of license is critical and fallback to file is not applicable """ edition = CLEditions.get_cl_edition( skip_jwt_check=skip_jwt_check, skip_marker_check=skip_marker_check, verify_exp=verify_exp ) return edition is not None and edition == SupportedEditions.SHARED.value def is_cl_shared_pro_edition(skip_jwt_check=False, skip_marker_check=False, verify_exp=True): """ Allow skip_jwt_check ONLY if it not critical when license is not valid Use skip_marker_check=True when validity of license is critical and fallback to file is not applicable """ edition = CLEditions.get_cl_edition( skip_jwt_check=skip_jwt_check, skip_marker_check=skip_marker_check, verify_exp=verify_exp ) return edition is not None and edition == SupportedEditions.SHARED_PRO.value def is_cl_shared_pro_edition_safely(**kwargs): """ Supress edition checking error and returns False """ try: return is_cl_shared_pro_edition(**kwargs) except CLEditionDetectionError: return False def is_cl_admin_edition(skip_jwt_check=False, skip_marker_check=False, verify_exp=True): """ Allow skip_jwt_check ONLY if it not critical when license is not valid Use skip_marker_check=True when validity of license is critical and fallback to file is not applicable """ edition = CLEditions.get_cl_edition( skip_jwt_check=skip_jwt_check, skip_marker_check=skip_marker_check, verify_exp=verify_exp ) return edition is not None and edition == SupportedEditions.ADMIN.value def get_cl_edition_readable() -> AnyStr: """ Function returns current edition of CL: - CloudLinux OS Shared - CloudLinux OS Shared Pro - CloudLinux OS Solo - Error string """ try: if is_cl_solo_edition(skip_jwt_check=True): return SOLO_EDITION_HUMAN_READABLE elif is_cl_admin_edition(skip_jwt_check=True): return ADMIN_EDITION_HUMAN_READABLE except CLEditionDetectionError: return SHARED_EDITION_HUMAN_READABLE success_flag, error_message, _ = jwt_token_check() # This error message indicates that user could not read jwt.token because not enough privileges # We have to check existence of token file, to be sure that privilege error acquired if 'read error' in error_message and (os.path.isfile(CLN_JWT_TOKEN_PATH) or in_cagefs()): with open(CL_EDITION_FILE_FOR_USERS, 'r') as f: return f.read().strip() if success_flag: return SHARED_PRO_EDITION_HUMAN_READABLE else: return SHARED_EDITION_HUMAN_READABLE def print_skip_message_on_solo(): """Just print skip message""" warnings.warn( 'as part of unification effort, features might not map directly to editions', DeprecationWarning, ) print('CloudLinux Solo edition detected! \n' 'Command is skipped, because it is unsupported and unneeded on current edition') # NOTE(vlebedev): To properly avoid circular imports, these 2 functions' implementations # are moved to "utils" submodule of "clcommon.cpapi" as it is the level # where all necessary panel stuff is imported and constructed and which # uses "clcommon.lib" module as a dependency. In other words "clcommon.lib" lays # lower in the modules hierarchy and should not import from "clcommon.cpapi"). def skip_without_lve(): warnings.warn( 'since v3.4.1 use "clcommon.cpapi.utils:skip_without_lve" instead', DeprecationWarning, ) from clcommon.cpapi.utils import skip_without_lve return skip_without_lve() def lve_supported_or_exit(f): warnings.warn( 'since v3.4.1 use "clcommon.cpapi.utils:lve_supported_or_exit" instead', DeprecationWarning, ) from clcommon.cpapi.utils import lve_supported_or_exit return lve_supported_or_exit(f) def user_cl_edition(): """This function is used as workaround for users to be able to get CL Edition. Crontab will write to CL_EDITION_FILE_FOR_USERS, output of cldetect --detect-edition command with 0644 permission. """ if os.path.exists(CL_EDITION_FILE_FOR_USERS): with open(CL_EDITION_FILE_FOR_USERS, 'r') as f: edition = f.read().strip() return HUMAN_READABLE_TOKEN_EDITION_PAIRS.get(edition) return None def get_os_version() -> Tuple[Optional[str], Optional[str]]: """ Detect system name and version :return: tuple (os_name, os_ver) """ # # cat /etc/os-release | grep -E '^NAME|^VERSION_ID' # NAME="Ubuntu" # VERSION_ID="20.04" # # cat /etc/os-release | grep -E '^NAME|^VERSION_ID' # NAME="CloudLinux" # VERSION_ID="7.9" try: os_release_filename = '/etc/os-release' section_name = 'top' config = configparser.ConfigParser() # config.read('/etc/os-release') with open(os_release_filename) as stream: config.read_string(f"[{section_name}]\n" + stream.read()) os_name = config.get(section_name, 'NAME').strip('"') os_ver = config.get(section_name, 'VERSION_ID').strip('"') return os_name, os_ver except (OSError, IOError, configparser.Error): pass return None, None def is_ubuntu() -> bool: """ Detertmines is this system Ubuntu :return: bool flag is_ubuntu """ os_name, _ = get_os_version() return os_name == 'Ubuntu' def is_container() -> bool: """ Determines is this system running inside container """ return os.path.exists('/etc/cloudlinux-container') def is_secureboot_enabled() -> bool: """ Determines if secure boot is turned on :return: bool flag is_secureboot_enabled """ enabled = False if os.path.exists('/sys/firmware/efi'): enabled = subprocess.call('mokutil --sb-state | grep enabled', shell=True, executable='/bin/bash', stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) == 0 return enabled