ÿØÿà JPEG ÿþ;
| Server IP : 68.65.120.201 / Your IP : 216.73.216.221 Web Server : LiteSpeed System : Linux server179.web-hosting.com 4.18.0-513.18.1.lve.el8.x86_64 #1 SMP Thu Feb 22 12:55:50 UTC 2024 x86_64 User : taxhyuvu ( 2294) PHP Version : 8.1.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /opt/cloudlinux/venv/lib/python3.11/site-packages/cllimits/lib/ |
Upload File : |
# coding:utf-8
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import json
import os
import sys
import time
from typing import Optional
from clcommon.utils import run_command, ExternalProgramFailed
from clcommon.clfunc import memory_to_page, page_to_memory
from clcommon.clexception import FormattedException
VALUES_STR = 'Available values for option'
_LIMITS_LOWER_BOUNDS_CONFIG_FILE = '/etc/sysconfig/limits_lower_bounds'
class LowerBoundsConfigException(FormattedException):
pass
def _read_lower_bounds_config() -> dict:
"""
Read all lower bounds from config file
Returns dict with lower bound names as keys and values as strings
:raises LowerBoundsConfigException: If file exists but cannot be read
"""
# If file doesn't exist return empty dict
if not os.path.exists(_LIMITS_LOWER_BOUNDS_CONFIG_FILE):
return {}
lower_bounds = {}
try:
with open(_LIMITS_LOWER_BOUNDS_CONFIG_FILE, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and '=' in line and not line.startswith('#'):
key, value = line.split('=', 1)
lower_bounds[key.strip()] = value.strip()
except (OSError, IOError) as e:
raise LowerBoundsConfigException({
'message': "Failed to read limits' lower bounds configuration from %(file)s. "
"The file may be corrupted. Error: %(error)s",
'context': {
'file': _LIMITS_LOWER_BOUNDS_CONFIG_FILE,
'error': str(e)
}
}) from e
return lower_bounds
def _write_lower_bounds_config(lower_bounds: dict) -> bool:
"""
Write lower bounds to config file
"""
try:
os.makedirs(os.path.dirname(_LIMITS_LOWER_BOUNDS_CONFIG_FILE), exist_ok=True)
with open(_LIMITS_LOWER_BOUNDS_CONFIG_FILE, 'w', encoding='utf-8') as f:
f.write('# CloudLinux lower bounds configuration\n')
f.write('# Format: RESOURCE_TYPE=value\n\n')
for key, value in sorted(lower_bounds.items()):
f.write(f'{key}={value}\n')
return True
except (OSError, IOError):
return False
def read_lower_bound(resource_type: str) -> Optional[int]:
"""
Read lower bound for any resource type from config file
Returns lower bound in pages or None if not set
:param resource_type: Type of resource (e.g., 'PMEM', 'VMEM')
:raises LowerBoundsConfigException: If config file is corrupted
"""
lower_bounds = _read_lower_bounds_config()
limit_str = lower_bounds.get(resource_type.upper())
if limit_str:
return memory_to_page(limit_str)
return None
def write_lower_bound(resource_type: str, bound_value: str) -> bool:
"""
Write lower bound for any resource type to config file
:param resource_type: Type of resource (e.g., 'PMEM', 'VMEM')
:param bound_value: Lower bound value as string (e.g., '512M', '1G')
:raises LowerBoundsConfigException: If config file is corrupted
"""
lower_bounds = _read_lower_bounds_config()
lower_bounds[resource_type.upper()] = bound_value
return _write_lower_bounds_config(lower_bounds)
def check_pmem_lower_bound(pmem_pages: int) -> Optional[dict]:
"""
Check if PMEM value is below configured lower bound and return error
:param pmem_pages: PMEM value in pages
:return: Error dict with 'message' and 'context' or None
"""
try:
lower_bound_pages = read_lower_bound('PMEM')
except LowerBoundsConfigException as e:
return {'message': e.message, 'context': e.context}
if lower_bound_pages is not None and pmem_pages != 0 and pmem_pages < lower_bound_pages:
lower_bound = page_to_memory(lower_bound_pages)
return {
'message': (
'PMEM limit is below configured lower bound of %(bound)s. '
'This may cause application instability and performance issues.'
),
'context': {'bound': lower_bound}
}
return None
def check_pmem_lower_bound_from_string(pmem_value: str) -> Optional[dict]:
"""
Check if PMEM value (as string) is below configured lower bound and return error
:param pmem_value: PMEM value as string (e.g., "5m", "512M", etc.)
:return: Error dict with 'message' and 'context' or None
"""
pmem_pages = memory_to_page(pmem_value)
if pmem_pages is not None:
return check_pmem_lower_bound(pmem_pages)
return None
def replace_params(data):
"""
Replacing params in data for show error message
:param data: error's data for show message
:return:
"""
if data.startswith("--"):
param, text = data.split(" ", 1)
return {"result": "%(param)s " + text, "context": {"param": param}}
if data.startswith(VALUES_STR):
text, param = data.split(":", 1)
return {"result": text + ": %(available_options)s",
"context": {"available_options": param.strip()}}
return {"result": data}
def _is_string_number(s_val):
"""
Checks is string contains a number (integer or float)
:param s_val: String to check
:return: True - string is number, False - not number
"""
try:
float(s_val)
return True
except ValueError:
return False
def convert_mem_value_to_bytes(value):
"""
Convert value in Gbytes,Mbytes to bytes
:param value: value of mem limit
:return: value in bytes
"""
value = str(value).lower()
if value.endswith('k'):
power = 1
elif value.endswith('m'):
power = 2
elif value.endswith('g'):
power = 3
elif _is_string_number(value):
power = 1
value = f'{value}k'
else:
raise ValueError('Wrong memory value')
return int(1024 ** power * float(value[:-1]))
def _convert_memory_value_to_adaptive_format(value, convert=True):
"""
Convert memory value to adaptive value in GB, TB, etc
:param value: memory value in MB or KB
:param convert: if True - convert value, False - not convert
:return: adaptive value in GB, TB, etc
"""
if not convert:
return value
value = str(value).lower()
units = ['K', 'M', 'G', 'T', 'P']
if value.endswith('m'):
del units[0]
value = str(value).lower().replace('m', '').replace('k', '') # remove unit symbol
if value.startswith('*'):
result = '*'
value = value.replace('*', '')
else:
result = ''
value = float(value)
for unit in units:
if value // 1024 > 0:
value /= 1024
elif value == 0:
result = f'{result}0K'
break
else:
result = f'{result}{value:.2f}{unit}'
break
return result
def print_dictionary(data_dict, is_json=False):
"""
Print specified dictionary
:param data_dict: data dictionary to print
:param is_json: True - print in JSON, False - in text
:return: None
"""
# Print as JSON
# print json.dumps(data_dict, indent=2, sort_keys=True)
if is_json:
# Print as JSON
print(json.dumps(data_dict, sort_keys=True))
else:
# Print as text
print(data_dict)
def print_error_and_exit(message, is_json=False):
"""
Prints to stdout
:param: is_json - True if print error in json format, False - text
"""
data = {"timestamp": time.time(), "result": str(message)}
print_dictionary(data, is_json)
sys.exit(1)
def is_quota_supported(cl_quota_path, repquota_path):
"""
Detect quota is supported
:return: True/False - quotas supported/not supported
"""
# Check if all needed utilities present
if not os.path.isfile(cl_quota_path) or not os.path.isfile(repquota_path):
return False
return True
def is_quota_active(cl_quota_path, repquota_path):
"""
Detect quota is activated
:return: True/False - quotas activated/not activated
"""
# If quotas not supported they are not activated
if not is_quota_supported(cl_quota_path, repquota_path):
return False
# Check is quota activated
cmd = [repquota_path, '-nva']
try:
stdout = run_command(cmd)
except ExternalProgramFailed:
return False
# quotas not supported if repqouta returns nothing
if not stdout:
return False
return True