D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
xray
/
continuous
/
Filename :
manager.py
back
Copy
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ This module contains continuous tracing manager, aimed to enable|disable tracing """ from urllib.parse import urlparse from xray import gettext as _ from .common import ContinuousCommon from ..internal.exceptions import XRayError, XRayManagerExit from ..internal.utils import timestamp class ContinuousManager(ContinuousCommon): """ enable|disable|start|stop|list continuous tracing """ @staticmethod def url_sanitize(url: str) -> str: """ Leaves only domain path of URL, without www. prefix and :port postfix """ fragments = urlparse(url) _no_www_netloc = fragments.netloc.replace('www.', '') _no_port_netloc = _no_www_netloc.split(':')[0] return f"{fragments.scheme}://{_no_port_netloc}" def enable(self, domain: str, url: str, email: str) -> None: """ Enable continuous tracing """ if domain not in self.tracing_conf: self.tracing_conf[domain] = { "creation_time": timestamp(), "domain": domain, "original_url": self.url_sanitize(url), "email": email, "execution_count": 0, "status": "running" } else: raise XRayManagerExit( _('Continuous monitoring for domain "%s" is already enabled') % domain) self.dump_tracing_configuration() def disable(self, domain: str) -> None: """ Disable continuous tracing """ self.remove_tracing_configuration(domain) def start(self, domain: str) -> None: """ Start continuous monitoring """ self.update_status(domain, 'running') def stop(self, domain: str) -> None: """ Stop continuous monitoring """ self.update_status(domain, 'stopped') def update_status(self, domain: str, status: str) -> None: """ Set given 'status' for requested 'domain' """ try: if self.tracing_conf[domain]['status'] == status: self.logger.error('Continuous monitoring is already %s', status, extra={'domain': domain, 'tracing_entry': self.tracing_conf[ domain]}) raise XRayError( _('Continuous monitoring for {} is already {}'.format(domain, status))) except KeyError: self.logger.error('Continuous monitoring is not enabled', extra={'domain': domain, 'tracing_entries': self.tracing_conf}) raise XRayError( _('Continuous monitoring for %s is not enabled') % domain) self.tracing_conf[domain]['status'] = status self.dump_tracing_configuration() def get_tracing_list(self): """ Return all continuous tracing tasks """ return list(self.tracing_conf.values())