ÿØÿà JPEG ÿþ; 403WebShell
403Webshell
Server IP : 68.65.120.201  /  Your IP : 216.73.216.184
Web Server : LiteSpeed
System : Linux server179.web-hosting.com 4.18.0-513.18.1.lve.el8.x86_64 #1 SMP Thu Feb 22 12:55:50 UTC 2024 x86_64
User : taxhyuvu ( 2294)
PHP Version : 8.1.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/cl_website_collector/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/cl_website_collector//apache_processor.py
# -*- coding: utf-8 -*-
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2024 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import logging
import os
import subprocess
import shutil
from typing import Dict, List, Optional, Any


class ApacheProcessor:
    """
    Collects Apache configuration and module information.
    """

    def __init__(self, logger: logging.Logger):
        # General Apache configuration directories (exclude user-specific subdirs)
        self.apache_config_locations = [
            '/etc/apache2/conf/',
            '/etc/apache2/conf-available/',
            '/etc/apache2/conf.d/',  # Will be filtered to exclude userdata subdirs
            '/etc/apache2/conf.modules.d/',
        ]

        # User-specific configuration base paths (Apache versions 2.2 and 2.4)
        self.user_config_base_paths = [
            '/etc/apache2/conf.d/userdata/ssl/2_4',  # SSL configs for Apache 2.4
            '/etc/apache2/conf.d/userdata/std/2_4',  # Standard configs for Apache 2.4
            '/etc/apache2/conf.d/userdata/ssl/2_2',  # SSL configs for Apache 2.2 (legacy)
            '/etc/apache2/conf.d/userdata/std/2_2',  # Standard configs for Apache 2.2 (legacy)
        ]

        self.logger = logger

        self._apache_binary_cache = None
        self._apache_version_cache = None
        self._apache_modules_cache = None

    def _find_apache_binary(self) -> Optional[str]:
        """
        Find Apache binary executable using which command with caching.
        """
        if self._apache_binary_cache is not None:
            return self._apache_binary_cache

        error = None
        try:
            for binary_name in ['httpd', 'apache2']:
                binary_path = shutil.which(binary_name)
                if binary_path and os.access(binary_path, os.X_OK):
                    self.logger.debug("Found Apache binary: %s", binary_path)
                    self._apache_binary_cache = binary_path
                    return binary_path
        except Exception as e:
            error = e

        self.logger.error("[WEBSITE-COLLECTOR] Apache binary not found in PATH %s", error)
        self._apache_binary_cache = None
        return None

    def _get_apache_version(self) -> str:
        """
        Get Apache version information with caching.
        """
        if self._apache_version_cache is not None:
            return self._apache_version_cache

        apache_binary = self._find_apache_binary()
        if not apache_binary:
            self._apache_version_cache = "Apache binary not found"
            return self._apache_version_cache

        try:
            result = subprocess.run(
                [apache_binary, '-v'],
                capture_output=True,
                text=True,
                timeout=10,
                check=False
            )

            if result.returncode == 0:
                # Extract just the version line for cleaner metadata
                version_output = result.stdout.strip()
                if version_output:
                    # Get first line which contains version info
                    first_line = version_output.split('\n')[0]
                    self._apache_version_cache = first_line
                else:
                    self._apache_version_cache = "Version info empty"
            else:
                self._apache_version_cache = f"Error getting version: {result.stderr.strip()}"

        except subprocess.TimeoutExpired:
            self._apache_version_cache = "Version check timed out"
        except Exception as e:
            self._apache_version_cache = f"Version check failed: {e}"

        return self._apache_version_cache

    def _get_apache_modules(self) -> List[str]:
        """
        Get Apache modules using httpd -M command with caching.

        Returns:
            List of module names (e.g., ['core_module', 'so_module', 'http_module', ...])
        """
        # Return cached modules if available
        if self._apache_modules_cache is not None:
            return self._apache_modules_cache

        apache_binary = self._find_apache_binary()
        if not apache_binary:
            self.logger.debug("Apache binary not found, cannot get modules")
            self._apache_modules_cache = []
            return self._apache_modules_cache

        try:
            result = subprocess.run(
                [apache_binary, '-M'],
                capture_output=True,
                text=True,
                timeout=10,
                check=False
            )

            if result.returncode == 0:
                modules = []
                lines = result.stdout.split('\n')

                # Skip first line 'Loaded Modules:' and process module lines
                for line in lines[1:]:
                    if not line.strip():
                        continue

                    # Parse line like: "core_module (static)" or "so_module (shared)"
                    try:
                        module_name = line.strip().split(' ')[0]
                        if module_name:
                            modules.append(module_name)
                    except (IndexError, AttributeError):
                        continue  # Skip malformed lines

                self.logger.debug("Found %d Apache modules via %s -M", len(modules), apache_binary)
                self._apache_modules_cache = modules
                return modules

            self.logger.debug("Apache modules command failed: %s", result.stderr.strip())
            self._apache_modules_cache = []
            return self._apache_modules_cache

        except subprocess.TimeoutExpired:
            self.logger.error("[WEBSITE-COLLECTOR] Apache modules command timed out")
            self._apache_modules_cache = []
            return self._apache_modules_cache
        except Exception as e:
            self.logger.error("[WEBSITE-COLLECTOR] Failed to get Apache modules: %s", e)
            self._apache_modules_cache = []
            return self._apache_modules_cache

    def get_apache_system_info(self) -> Dict[str, Any]:
        """
        Get Apache system information for export as separate record.

        Returns:
            Dict with system information to be exported as single record
        """
        modules = self._get_apache_modules()
        version = self._get_apache_version()

        system_info = {
            'apache_version': version,
            'loaded_modules': modules,
        }

        return system_info

    def _get_user_config_locations(self, usernames: List[str]) -> List[str]:
        """
        Get Apache configuration directories for specific users only.

        Args:
            usernames: List of usernames to get configurations for

        Returns:
            List of directories containing user-specific Apache configurations
        """
        user_locations = []

        for username in usernames:
            for base_path in self.user_config_base_paths:
                user_path = os.path.join(base_path, username)
                if os.path.isdir(user_path):
                    user_locations.append(user_path)
                    self.logger.debug("Found user config directory: %s", user_path)

        return user_locations

    def collect_user_specific_config_paths(self, usernames: List[str]) -> Dict[str, Any]:
        """
        Collect paths to user-specific Apache configuration files.

        Args:
            usernames: List of usernames to collect configs for

        Returns:
            Dict with collected config file paths as simple list
        """
        config_data = {
            'config_paths': []
        }

        if not usernames:
            return config_data

        try:
            user_locations = self._get_user_config_locations(usernames)
            self.logger.debug("Scanning %d user-specific locations for %d users",
                              len(user_locations), len(usernames))

            for location in user_locations:
                if os.path.isdir(location):
                    self._collect_config_paths_from_directory(location, config_data, exclude_user_dirs=False)

        except Exception as e:
            self.logger.error("[WEBSITE-COLLECTOR] Error in collect_user_specific_config_paths: %s", e)

        self.logger.debug("Found %d user-specific Apache config files", len(config_data['config_paths']))

        return config_data

    def collect_global_config_paths(self) -> Dict[str, Any]:
        """
        Collect paths to general Apache configuration files (non-user-specific).

        Returns:
            Dict with collected config file paths as simple list
        """
        config_data = {
            'config_paths': []
        }

        try:
            for location in self.apache_config_locations:
                if os.path.isdir(location):
                    try:
                        # For conf.d directory, exclude userdata subdirs to avoid collecting user-specific configs
                        exclude_users = location.rstrip('/').endswith('/conf.d')
                        self._collect_config_paths_from_directory(location, config_data,
                                                                  exclude_user_dirs=exclude_users)
                        self.logger.debug("Collected config paths from %s (exclude_user_dirs=%s)",
                                          location, exclude_users)
                    except Exception as e:
                        self.logger.error("[WEBSITE-COLLECTOR] Error scanning config directory %s: %s", location, e)

            self.logger.debug("Found %d general Apache config files", len(config_data['config_paths']))

        except Exception as e:
            self.logger.error("[WEBSITE-COLLECTOR] Error in collect_global_config_paths: %s", e)

        return config_data

    def _collect_config_paths_from_directory(self, location: str, config_data: Dict[str, Any],
                                             exclude_user_dirs: bool = False) -> None:
        """
        Helper method to collect configuration file paths from a specific directory.

        Args:
            location: Directory path to scan for .conf files
            config_data: Dictionary with 'config_paths' list to store collected paths
            exclude_user_dirs: If True, skip userdata subdirectories to avoid collecting user-specific configs
        """
        max_depth = 10  # Reasonable limit for Apache configs
        for root, dirs, files in os.walk(location):
            # Calculate current depth relative to base location
            depth = root[len(location):].count(os.sep)
            if depth >= max_depth:
                dirs[:] = []  # Don't descend further
                continue

            # Exclude userdata directories when collecting general configs
            if exclude_user_dirs and 'userdata' in dirs:
                dirs.remove('userdata')
                self.logger.debug("Excluded userdata directory from general config collection in %s", root)

            for filename in files:
                if filename.endswith('.conf'):
                    file_path = os.path.join(root, filename)
                    # Check if file exists (including symlinks) and is readable
                    if (os.path.exists(file_path) and
                            (os.path.isfile(file_path) or os.path.islink(file_path)) and
                            os.access(file_path, os.R_OK)):

                        # Add to simple list - no grouping by directory needed
                        config_data['config_paths'].append({
                            'file_path': file_path,
                            'is_symlink': os.path.islink(file_path)
                        })
                    else:
                        self.logger.debug("Cannot access config file: %s", file_path)

Youez - 2016 - github.com/yon3zu
LinuXploit