D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
xray
/
internal
/
Filename :
fpm_utils.py
back
Copy
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from dataclasses import dataclass from typing import Optional from xray import gettext as _ from .constants import fpm_stat_storage, fpm_reload_timeout from .exceptions import XRayError from .utils import dbm_storage, timestamp @dataclass class FPMReloadController: """ Class to control the frequency of FPM service reloading """ fpm_service: str @property def latest_reload(self) -> Optional[int]: """ Get the timestamp of latest reload of FPM service """ try: with dbm_storage(fpm_stat_storage) as _fpm_reload_stat: try: return int(_fpm_reload_stat[self.fpm_service].decode()) except KeyError: return None except RuntimeError as e: raise XRayError( _('Failed to get the timestamp of latest FPM reload: %s') % str(e), flag='warning') def save_latest_reload(self): """ Save the timestamp of latest reload of FPM service """ try: # save timestamp of latest reload of corresponding service with dbm_storage(fpm_stat_storage) as _fpm_reload_stat: _fpm_reload_stat[self.fpm_service] = str(timestamp()) except RuntimeError as e: raise XRayError( _('Failed to save the timestamp of latest FPM reload %s, but main operation executed') % str(e), flag='warning') def delta(self) -> float: """ Get the time delta between current time and saved timestamp """ try: # we store timestamp in seconds, # but timeout -- fpm_reload_timeout -- in minutes return (timestamp() - self.latest_reload) / 60 except (TypeError, XRayError): # 1 hour, # big enough for non-blocking flow in case of unexpected errors return 60.0 def restrict(self) -> bool: """ Check if FPM reload should be blocked """ return self.delta() <= fpm_reload_timeout