D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
thread-self
/
root
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clwpos
/
user
/
Filename :
cache.py
back
Copy
import hashlib import logging import os import pwd from contextlib import contextmanager from dataclasses import dataclass from typing import ContextManager, Optional from clwpos import constants, scoped_cache def _get_cache_directory(): user = pwd.getpwuid(os.geteuid()) cache_dir = os.path.join(user.pw_dir, constants.USER_WPOS_DIR, '.cache') os.makedirs(cache_dir, exist_ok=True) return cache_dir def _get_wp_config_modification_ts(wp_path): try: return os.path.getmtime(os.path.join(wp_path, 'wp-config.php')) except FileNotFoundError: return os.path.getmtime(os.path.join(wp_path, '../wp-config.php')) @dataclass class CacheRecord: # any data that we would like to save # important! do not use pickle or any other serializable # data format that might be loaded in root environment data: Optional[str] # marker which becomes true whenever we change our data is_dirty: bool = False def __setattr__(self, key, value): super().__setattr__('is_dirty', True) super().__setattr__(key, value) @contextmanager def wp_config_cache(key, path) -> ContextManager[CacheRecord]: # there is no need to cache these values # in not actively requested places if not scoped_cache.CACHING_ENABLED: yield CacheRecord(data=None) return ts = _get_wp_config_modification_ts(wp_path=path) record = CacheRecord( data=get(key, path, valid_after_ts=ts)) try: yield record finally: if record.is_dirty: set(key, path, record.data) def get(key: str, path: str, valid_after_ts: float) -> Optional[str]: cache_file = os.path.join( _get_cache_directory(), f'{key}.{hashlib.md5(path.encode()).hexdigest()}.cache') try: # if file was modified earlier than timestamp, # we assume that cache is expired if os.path.getmtime(cache_file) < valid_after_ts: logging.info('Cache "%s" assumed to be outdated', key) return None with open(cache_file, 'r') as f: cache_info_raw = f.read() except (IOError, OSError): # assume that file does not exist # or we don't have access logging.info('Cache "%s" is not existing or malformed', key) return None return cache_info_raw def set(key: str, path:str, value: str) -> None: cache_file = os.path.join( _get_cache_directory(), f'{key}.{hashlib.md5(path.encode()).hexdigest()}.cache') try: with open(cache_file, 'w') as f: f.write(value) except (IOError, OSError): return None