============================================================================================================================================= | # Title : Cloudbleed Scanner - Detects Cloudflare Memory Leak Patterns | | # Author : indoushka | | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 145.0.2 (64 bits) | | # Vendor : https://www.cloudflare.com/ | ============================================================================================================================================= [+] References : https://packetstorm.news/files/id/212490/ [+] Summary : Cloudbleed Scanner is a comprehensive security tool designed to detect memory leak patterns similar to the 2017 Cloudbleed incident, where Cloudflare's reverse proxies leaked uninitialized memory containing sensitive data. [+] POC : python poc.py #!/usr/bin/env python3 """ Cloudbleed Scanner - Detects Cloudflare Memory Leak Patterns Author: indoushka """ import asyncio import aiohttp import json import re import sys import os from datetime import datetime, timedelta import logging import ssl import certifi import hashlib import base64 from typing import Dict, List, Set, Optional, Any, Tuple from collections import defaultdict from dataclasses import dataclass import sqlite3 from pathlib import Path from urllib.parse import urlparse # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # SSL Context ssl_context = ssl.create_default_context(cafile=certifi.where()) @dataclass class IOCClassification: """IOC Classification Levels""" critical: List[str] suspicious: List[str] low_risk: List[str] @dataclass class MITRETactic: """MITRE ATT&CK Tactic Mapping""" id: str name: str techniques: List[str] confidence: float class CompleteReportSaver: """Save COMPLETE reports with ALL details - NO TRUNCATION""" @staticmethod def decode_jwt(token: str) -> Dict: """Decode JWT token to header and payload - COMPLETE""" try: parts = token.split('.') if len(parts) != 3: return {} # Decode header header_padding = '=' * (4 - len(parts[0]) % 4) if len(parts[0]) % 4 else '' payload_padding = '=' * (4 - len(parts[1]) % 4) if len(parts[1]) % 4 else '' header = base64.b64decode(parts[0] + header_padding).decode('utf-8', errors='ignore') payload = base64.b64decode(parts[1] + payload_padding).decode('utf-8', errors='ignore') return { 'header': json.loads(header) if header else {}, 'payload': json.loads(payload) if payload else {} } except Exception as e: return {'error': str(e)} @staticmethod def format_hex_string(hex_str: str) -> str: """Format hex string with grouping for better readability""" if len(hex_str) > 100: # Group every 8 characters grouped = ' '.join([hex_str[i:i+8] for i in range(0, len(hex_str), 8)]) return f"{grouped}\nLength: {len(hex_str)} characters" return hex_str @staticmethod def format_binary_data(binary_str: str) -> str: """Format binary/non-printable data""" if not binary_str: return "" # Show hex representation for non-printable hex_repr = binary_str.encode('utf-8', errors='ignore').hex() printable = ''.join([c if 32 <= ord(c) < 127 else '.' for c in binary_str]) result = f"Raw: {binary_str}\n" result += f"Hex: {hex_repr}\n" result += f"Printable: {printable}\n" result += f"Length: {len(binary_str)} characters" return result @staticmethod def save_complete_report(result: Dict, filename: str = None) -> str: """Save COMPLETE report in TXT format - NO TRUNCATION""" if filename is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") domain = urlparse(result['url']).netloc.replace('.', '_')[:50] filename = f"CLOUDBLEED_COMPLETE_REPORT_{domain}_{timestamp}.txt" with open(filename, 'w', encoding='utf-8', errors='replace') as f: # ==================== REPORT HEADER ==================== f.write("="*120 + "\n") f.write("๐Ÿšจ CLOUDBLEED COMPLETE THREAT INTELLIGENCE SCAN REPORT ๐Ÿšจ\n") f.write("โš ๏ธ Cloudflare Reverse Proxies Memory Leak Detection - COMPLETE DATA DISPLAY โš ๏ธ\n") f.write("="*120 + "\n\n") # ==================== BASIC INFORMATION ==================== f.write("๐Ÿ“Š ๐Ÿ“Š ๐Ÿ“Š BASIC INFORMATION ๐Ÿ“Š ๐Ÿ“Š ๐Ÿ“Š\n") f.write("="*120 + "\n") f.write(f"๐ŸŒ URL: {result.get('url', 'N/A')}\n") f.write(f"๐Ÿ“‹ Status Code: {result.get('status', 'N/A')}\n") f.write(f"๐Ÿ• Scan Time: {result.get('timestamp', 'N/A')}\n") f.write(f"๐Ÿ“ Content Size: {result.get('content_length', 0):,} bytes\n") f.write(f"๐Ÿ“„ Content Type: {result.get('content_type', 'Unknown')}\n") f.write(f"๐Ÿ–ฅ๏ธ Server Header: {result.get('server', 'Unknown')}\n") f.write(f"๐Ÿ”— Final URL (after redirects): {result.get('final_url', 'N/A')}\n\n") if result.get('error'): f.write(f"โŒ โŒ โŒ SCAN ERROR โŒ โŒ โŒ\n") f.write(f"Error: {result['error']}\n\n") return filename # ==================== FINGERPRINTING ==================== fingerprint = result.get('fingerprint', {}) if fingerprint: f.write("๐Ÿ–ฅ๏ธ ๐Ÿ–ฅ๏ธ ๐Ÿ–ฅ๏ธ ADVANCED PLATFORM FINGERPRINTING ๐Ÿ–ฅ๏ธ ๐Ÿ–ฅ๏ธ ๐Ÿ–ฅ๏ธ\n") f.write("="*120 + "\n") tech_mapping = [ ('๐ŸŒ CDN Provider', 'cdn'), ('๐Ÿ›ก๏ธ WAF Protection', 'waf'), ('๐Ÿ’ป Programming Language', 'language'), ('๐Ÿ—๏ธ Web Framework', 'framework'), ('๐Ÿ–ฅ๏ธ Server Software', 'server_software'), ] for display_name, key in tech_mapping: if fingerprint.get(key): f.write(f"โ€ข {display_name}: {fingerprint[key]}\n") if fingerprint.get('technologies'): f.write(f"\n๐Ÿ› ๏ธ ALL DETECTED TECHNOLOGIES:\n") for tech in fingerprint['technologies']: f.write(f" โœ“ {tech}\n") f.write(f"\n๐Ÿ“Š FINGERPRINT RISK SCORE: {fingerprint.get('risk_score', 0):.2f}/1.0\n") f.write("\n" + "="*120 + "\n\n") # ==================== HEADERS ANALYSIS ==================== headers_data = result.get('findings', {}).get('headers', {}) if headers_data: f.write("๐Ÿ“‹ ๐Ÿ“‹ ๐Ÿ“‹ COMPLETE HEADERS ANALYSIS ๐Ÿ“‹ ๐Ÿ“‹ ๐Ÿ“‹\n") f.write("="*120 + "\n") # ALL Security Headers Present if headers_data.get('security_headers'): f.write("โœ… โœ… โœ… PRESENT SECURITY HEADERS:\n") f.write("-"*80 + "\n") for header, data in headers_data['security_headers'].items(): f.write(f"\n๐Ÿ”น {header}:\n") f.write(f" Value: {data.get('value', '')}\n") f.write(f" Risk Level: {data.get('risk', 'unknown').upper()}\n") f.write("\n") # COMPLETE LIST of Missing Security Headers if headers_data.get('missing_headers'): f.write("โŒ โŒ โŒ MISSING SECURITY HEADERS:\n") f.write("-"*80 + "\n") for idx, header in enumerate(headers_data['missing_headers'], 1): f.write(f"{idx:2d}. {header}\n") # Detailed explanations for EACH missing header security_headers_explanation = { 'Strict-Transport-Security': { 'risk': 'CRITICAL', 'description': 'Prevents SSL stripping attacks and protocol downgrade attacks', 'impact': 'Without HSTS, attackers can force HTTPS sites to HTTP', 'recommendation': 'Implement: max-age=31536000; includeSubDomains; preload' }, 'Content-Security-Policy': { 'risk': 'CRITICAL', 'description': 'Prevents XSS, clickjacking, and other code injection attacks', 'impact': 'Site vulnerable to cross-site scripting attacks', 'recommendation': 'Implement strict CSP with proper directives' }, 'X-Frame-Options': { 'risk': 'HIGH', 'description': 'Prevents clickjacking attacks by controlling framing', 'impact': 'Site can be embedded in malicious frames', 'recommendation': 'Set to: DENY or SAMEORIGIN' }, 'X-Content-Type-Options': { 'risk': 'MEDIUM', 'description': 'Prevents MIME type sniffing attacks', 'impact': 'Browsers may interpret files incorrectly', 'recommendation': 'Set to: nosniff' }, 'Referrer-Policy': { 'risk': 'MEDIUM', 'description': 'Controls how much referrer information is sent', 'impact': 'Potential information leakage through referrer headers', 'recommendation': 'Set to: strict-origin-when-cross-origin' } } f.write("\n๐Ÿ“ ๐Ÿ“ ๐Ÿ“ DETAILED EXPLANATION OF MISSING HEADERS ๐Ÿ“ ๐Ÿ“ ๐Ÿ“\n") f.write("-"*80 + "\n") for header in headers_data['missing_headers']: if header in security_headers_explanation: info = security_headers_explanation[header] f.write(f"\n๐Ÿ”ธ {header}:\n") f.write(f" Risk Level: {info['risk']}\n") f.write(f" Description: {info['description']}\n") f.write(f" Impact: {info['impact']}\n") f.write(f" Recommendation: {info['recommendation']}\n") f.write("\n") # Server Information with COMPLETE details if headers_data.get('server_info', {}).get('server'): server = headers_data['server_info']['server'] f.write("๐Ÿ–ฅ๏ธ ๐Ÿ–ฅ๏ธ ๐Ÿ–ฅ๏ธ SERVER INFORMATION ๐Ÿ–ฅ๏ธ ๐Ÿ–ฅ๏ธ ๐Ÿ–ฅ๏ธ\n") f.write("-"*80 + "\n") f.write(f"Server Header: {server}\n") # Extract and display ALL version information version_patterns = [ r'(\d+\.\d+(?:\.\d+)?(?:\.\d+)?)', # Standard version r'v(\d+)', # vX format r'(\d{8})', # Date format r'(\d{4}[a-z]?)' # Year + optional letter ] found_versions = [] for pattern in version_patterns: matches = re.findall(pattern, server) found_versions.extend(matches) if found_versions: f.write("\nโš ๏ธ โš ๏ธ โš ๏ธ EXPOSED VERSION INFORMATION โš ๏ธ โš ๏ธ โš ๏ธ\n") f.write("The following version information was exposed:\n") for version in found_versions: if isinstance(version, tuple): version = version[0] f.write(f" โ€ข Version: {version}\n") f.write("\n๐Ÿšจ SECURITY IMPLICATIONS:\n") f.write("โ€ข Attackers can target specific vulnerabilities for this version\n") f.write("โ€ข Automated scanners can identify known exploits\n") f.write("โ€ข Version disclosure violates security best practices\n") f.write("\n" + "="*120 + "\n\n") # ==================== SECURITY ANALYSIS ==================== security = result.get('findings', {}).get('security', {}) if security: f.write("๐Ÿ”’ ๐Ÿ”’ ๐Ÿ”’ COMPREHENSIVE SECURITY ANALYSIS ๐Ÿ”’ ๐Ÿ”’ ๐Ÿ”’\n") f.write("="*120 + "\n") f.write(f"๐ŸŽฏ OVERALL RISK LEVEL: {security.get('risk_level', 'low').upper()}\n") f.write(f"๐Ÿ“ˆ RISK SCORE: {security.get('risk_score', 0):.2f}/1.0\n\n") if security.get('issues'): f.write("โš ๏ธ โš ๏ธ โš ๏ธ SECURITY ISSUES FOUND โš ๏ธ โš ๏ธ โš ๏ธ\n") f.write("-"*80 + "\n") for idx, issue in enumerate(security.get('issues', []), 1): f.write(f"{idx:2d}. {issue}\n") f.write("\n") # ==================== COMPLETE MEMORY LEAK PATTERNS ==================== if security.get('memory_patterns'): f.write("๐Ÿšจ ๐Ÿšจ ๐Ÿšจ CLOUDBLEED MEMORY LEAK PATTERNS DETECTED ๐Ÿšจ ๐Ÿšจ ๐Ÿšจ\n") f.write("="*120 + "\n") f.write("โš ๏ธ WARNING: These patterns indicate potential Cloudflare memory leaks\n") f.write("โ„น๏ธ Similar to the 2017 Cloudbleed incident where uninitialized memory\n") f.write(" was dumped by Cloudflare reverse proxies\n") f.write("="*120 + "\n\n") memory_patterns = security.get('memory_patterns', []) f.write(f"๐Ÿ“Š TOTAL MEMORY LEAK PATTERNS FOUND: {len(memory_patterns)}\n\n") for idx, pattern_info in enumerate(memory_patterns, 1): if isinstance(pattern_info, dict): pattern = pattern_info.get('pattern', '') length = pattern_info.get('length', 0) pattern_type = pattern_info.get('type', 'unknown') else: pattern = pattern_info length = len(pattern) pattern_type = 'unknown' f.write(f"\n{'='*80}\n") f.write(f"PATTERN {idx}/{len(memory_patterns)}\n") f.write(f"{'='*80}\n") f.write(f"Type: {pattern_type}\n") f.write(f"Length: {length} characters\n") f.write(f"MD5 Hash: {hashlib.md5(pattern.encode()).hexdigest()}\n") f.write(f"\n{'โ”€'*80}\n") f.write("COMPLETE PATTERN CONTENT (NO TRUNCATION):\n") f.write(f"{'โ”€'*80}\n") # Display COMPLETE pattern without truncation if length > 500: f.write(f"\nFIRST 1000 CHARACTERS:\n") f.write(pattern[:1000] + "\n") f.write(f"\n... [CONTINUED] ...\n\n") f.write(f"MIDDLE 1000 CHARACTERS:\n") mid_start = length // 2 - 500 f.write(pattern[mid_start:mid_start + 1000] + "\n") f.write(f"\n... [CONTINUED] ...\n\n") f.write(f"LAST 1000 CHARACTERS:\n") f.write(pattern[-1000:] + "\n") f.write(f"\nFULL LENGTH: {length} characters\n") else: f.write(pattern + "\n") # Hex representation for binary patterns if any(ord(c) < 32 or ord(c) > 126 for c in pattern[:100]): f.write(f"\n{'โ”€'*80}\n") f.write("HEX REPRESENTATION (first 500 chars):\n") hex_repr = pattern[:500].encode('utf-8', errors='ignore').hex() f.write(CompleteReportSaver.format_hex_string(hex_repr) + "\n") f.write(f"{'='*80}\n") f.write("\n๐Ÿ“ ๐Ÿ“ ๐Ÿ“ CLOUDBLEED RISK ASSESSMENT ๐Ÿ“ ๐Ÿ“ ๐Ÿ“\n") f.write("="*120 + "\n") f.write("๐Ÿ” PATTERN ANALYSIS:\n") f.write("โ€ข Long hex strings (>32 chars) may indicate memory dumps\n") f.write("โ€ข Null byte sequences (\\x00\\x00) may indicate uninitialized memory\n") f.write("โ€ข Non-printable characters may indicate binary data leaks\n") f.write("โ€ข UUID/GUID patterns may indicate memory addressing\n") f.write("โ€ข Repetitive patterns may indicate memory structures\n\n") f.write("๐Ÿšจ SECURITY IMPLICATIONS:\n") f.write("โ€ข Sensitive data (passwords, tokens, keys) may be exposed\n") f.write("โ€ข Session cookies and authentication tokens may be leaked\n") f.write("โ€ข Internal IP addresses and network information may be exposed\n") f.write("โ€ข Database credentials and API keys may be compromised\n") f.write("โ€ข Cloudflare sites with these patterns need IMMEDIATE investigation\n\n") f.write("๐Ÿ”ง RECOMMENDED ACTIONS:\n") f.write("1. Contact Cloudflare support immediately\n") f.write("2. Rotate ALL API keys and credentials\n") f.write("3. Invalidate ALL session tokens\n") f.write("4. Monitor for unauthorized access\n") f.write("5. Consider moving critical services off Cloudflare\n") f.write("\n" + "="*120 + "\n\n") if security.get('recommendations'): f.write("๐Ÿ’ก ๐Ÿ’ก ๐Ÿ’ก SECURITY RECOMMENDATIONS ๐Ÿ’ก ๐Ÿ’ก ๐Ÿ’ก\n") f.write("-"*80 + "\n") for idx, rec in enumerate(security.get('recommendations', []), 1): f.write(f"{idx:2d}. {rec}\n") f.write("\n") # ==================== COMPLETE SENSITIVE DATA ==================== sensitive_data = result.get('findings', {}).get('sensitive_data', {}) if sensitive_data: f.write("๐Ÿšจ ๐Ÿšจ ๐Ÿšจ COMPLETE SENSITIVE DATA DETECTED ๐Ÿšจ ๐Ÿšจ ๐Ÿšจ\n") f.write("="*120 + "\n") f.write("โš ๏ธ WARNING: The following sensitive data was found in the response\n") f.write(" This indicates potential data leakage or misconfiguration\n") f.write("="*120 + "\n\n") total_items = sum(len(items) for items in sensitive_data.values()) f.write(f"๐Ÿ“Š TOTAL SENSITIVE ITEMS FOUND: {total_items}\n\n") for category, items in sensitive_data.items(): if items: f.write(f"\n{'='*80}\n") f.write(f"๐Ÿ“ CATEGORY: {category.upper()} - {len(items)} ITEMS\n") f.write(f"{'='*80}\n\n") for idx, item in enumerate(items, 1): f.write(f"\n{'โ”€'*40} ITEM {idx} {'โ”€'*40}\n") if isinstance(item, dict): value = item.get('value', 'N/A') context = item.get('context', '') confidence = item.get('confidence', 0) f.write(f"CONFIDENCE LEVEL: {confidence:.0%}\n") f.write(f"RISK: {'HIGH' if confidence > 0.7 else 'MEDIUM' if confidence > 0.4 else 'LOW'}\n") f.write(f"\nVALUE (COMPLETE - NO TRUNCATION):\n") f.write(f"{'โ”€'*80}\n") f.write(f"{value}\n") f.write(f"{'โ”€'*80}\n") # Special detailed handling for JWT tokens if category == 'tokens' and value.startswith('eyJ'): f.write(f"\n๐Ÿ” JWT TOKEN ANALYSIS:\n") decoded = CompleteReportSaver.decode_jwt(value) if decoded.get('error'): f.write(f"JWT Decode Error: {decoded['error']}\n") else: if decoded.get('header'): f.write(f"\nJWT HEADER:\n") f.write(json.dumps(decoded['header'], indent=2, ensure_ascii=False) + "\n") if decoded.get('payload'): f.write(f"\nJWT PAYLOAD:\n") f.write(json.dumps(decoded['payload'], indent=2, ensure_ascii=False) + "\n") # Extract claims for analysis payload = decoded['payload'] if isinstance(payload, dict): if 'exp' in payload: exp_time = datetime.fromtimestamp(payload['exp']) f.write(f"\nโฐ TOKEN EXPIRATION: {exp_time} (UTC)\n") if 'iss' in payload: f.write(f"๐Ÿ“ ISSUER: {payload['iss']}\n") if 'sub' in payload: f.write(f"๐Ÿ‘ค SUBJECT: {payload['sub']}\n") # Special detailed handling for API keys elif category == 'api_keys': f.write(f"\n๐Ÿ”‘ API KEY ANALYSIS:\n") if value.startswith('AKIA'): f.write("TYPE: AWS Access Key ID\n") f.write("FORMAT: AKIA[16 uppercase alphanumeric characters]\n") f.write("๐Ÿšจ CRITICAL RISK: This should NEVER be exposed in client-side code\n") f.write("IMPACT: Full AWS account compromise possible\n") f.write("ACTION REQUIRED: Rotate IMMEDIATELY via AWS IAM\n") elif value.startswith('sk_'): f.write("TYPE: Stripe Secret Key\n") if 'live' in value.lower(): f.write("๐Ÿšจ CRITICAL: This is a LIVE production Stripe key!\n") f.write("IMPACT: Complete payment processing compromise\n") f.write("ACTION REQUIRED: Rotate IMMEDIATELY in Stripe Dashboard\n") else: f.write("โš ๏ธ WARNING: Test Stripe key exposed\n") elif len(value) >= 32 and re.match(r'^[a-fA-F0-9]+$', value): f.write("TYPE: Hexadecimal API Key\n") f.write(f"LENGTH: {len(value)} characters\n") f.write("FORMAT: Hexadecimal string\n") # Special handling for credentials elif category == 'credentials': f.write(f"\n๐Ÿ” CREDENTIAL ANALYSIS:\n") f.write(f"LENGTH: {len(value)} characters\n") if len(value) < 8: f.write("โš ๏ธ WARNING: Password is too short\n") if re.search(r'\d', value): f.write("โœ“ Contains numbers\n") if re.search(r'[A-Z]', value): f.write("โœ“ Contains uppercase letters\n") if re.search(r'[a-z]', value): f.write("โœ“ Contains lowercase letters\n") if re.search(r'[^A-Za-z0-9]', value): f.write("โœ“ Contains special characters\n") # Add context if available if context and context.strip(): f.write(f"\n๐Ÿ“„ CONTEXT (surrounding code/text):\n") f.write(f"{'โ”€'*80}\n") f.write(f"{context}\n") f.write(f"{'โ”€'*80}\n") else: # Non-dict item - display complete f.write(f"VALUE (COMPLETE):\n") f.write(f"{'โ”€'*80}\n") f.write(f"{str(item)}\n") f.write(f"{'โ”€'*80}\n") f.write(f"\n{'โ”€'*80}\n") f.write(f"\n{'='*80}\n\n") # ==================== CLOUDFLARE DETECTION ==================== cloudflare = result.get('findings', {}).get('cloudflare', {}) if cloudflare: f.write("๐Ÿ›ก๏ธ ๐Ÿ›ก๏ธ ๐Ÿ›ก๏ธ CLOUDFLARE DETECTION ANALYSIS ๐Ÿ›ก๏ธ ๐Ÿ›ก๏ธ ๐Ÿ›ก๏ธ\n") f.write("="*120 + "\n") f.write(f"๐Ÿ” CLOUDFLARE DETECTED: {'YES' if cloudflare.get('detected') else 'NO'}\n") f.write(f"๐Ÿ“Š CONFIDENCE LEVEL: {cloudflare.get('confidence', 0):.0%}\n\n") if cloudflare.get('detected'): f.write("โš ๏ธ CLOUDFLARE DETECTION IMPLICATIONS:\n") f.write("โ€ข Site is behind Cloudflare's reverse proxy network\n") f.write("โ€ข Potential for Cloudbleed-style memory leaks exists\n") f.write("โ€ข Cloudflare-specific cookies and headers present\n") f.write("โ€ข WAF protection (if enabled) may be in place\n\n") if cloudflare.get('indicators'): f.write("๐Ÿ“‹ CLOUDFLARE INDICATORS FOUND:\n") f.write("-"*80 + "\n") for idx, indicator in enumerate(cloudflare.get('indicators', []), 1): f.write(f"{idx:2d}. {indicator}\n") f.write("\n") # Cloudflare-specific risk assessment f.write("๐Ÿšจ CLOUDFLARE-SPECIFIC RISK ASSESSMENT:\n") f.write("-"*80 + "\n") if sensitive_data: f.write("โŒ HIGH RISK: Sensitive data found on Cloudflare-protected site\n") f.write(" This is a potential Cloudbleed scenario\n") elif security.get('memory_patterns'): f.write("โš ๏ธ MEDIUM RISK: Memory leak patterns detected\n") f.write(" Could indicate uninitialized memory exposure\n") else: f.write("โœ… LOW RISK: No immediate Cloudbleed indicators\n") f.write("\n") # ==================== INTELLIGENCE DATA ==================== intelligence = result.get('intelligence', {}) if intelligence: f.write("๐Ÿง  ๐Ÿง  ๐Ÿง  THREAT INTELLIGENCE ANALYSIS ๐Ÿง  ๐Ÿง  ๐Ÿง \n") f.write("="*120 + "\n") f.write(f"๐Ÿ“Š IOC SCORE: {intelligence.get('ioc_score', 0):.2f}/1.0\n") f.write(f"๐ŸŽฏ THREAT LEVEL: {intelligence.get('threat_level', 'low').upper()}\n\n") ioc_classification = intelligence.get('ioc_classification', {}) if any(ioc_classification.values()): f.write("๐Ÿ” IOC CLASSIFICATION:\n") f.write("-"*80 + "\n") for level, items in ioc_classification.items(): if items: f.write(f"\n{level.upper()} IOCS ({len(items)}):\n") for idx, item in enumerate(items[:10], 1): f.write(f" {idx:2d}. {item}\n") f.write("\n") mitre_tactics = intelligence.get('mitre_tactics', []) if mitre_tactics: f.write("๐ŸŽฏ MITRE ATT&CK TACTIC MAPPING:\n") f.write("-"*80 + "\n") for tactic in mitre_tactics: f.write(f"\nโ€ข {tactic.get('id', 'N/A')} - {tactic.get('name', 'N/A')}\n") f.write(f" Confidence: {tactic.get('confidence', 0):.0%}\n") f.write(f" Techniques: {', '.join(tactic.get('techniques', []))}\n") f.write("\n") # ==================== RAW RESPONSE DATA ==================== f.write("๐Ÿ“„ ๐Ÿ“„ ๐Ÿ“„ RAW RESPONSE METADATA ๐Ÿ“„ ๐Ÿ“„ ๐Ÿ“„\n") f.write("="*120 + "\n") f.write(f"Response Size: {result.get('content_length', 0):,} bytes\n") f.write(f"Response Type: {result.get('content_type', 'Unknown')}\n") if 'content_hash' in result: f.write(f"Content MD5: {result['content_hash']}\n") f.write(f"\nScan Completed: {datetime.now().isoformat()}\n") # ==================== REPORT FOOTER ==================== f.write("\n" + "="*120 + "\n") f.write("๐Ÿ“‹ REPORT SUMMARY\n") f.write("="*120 + "\n") summary_points = [] if security.get('risk_level') == 'high': summary_points.append("๐Ÿšจ HIGH RISK - Immediate action required") elif security.get('risk_level') == 'medium': summary_points.append("โš ๏ธ MEDIUM RISK - Investigation recommended") else: summary_points.append("โœ… LOW RISK - Regular monitoring suggested") if sensitive_data: total_sensitive = sum(len(items) for items in sensitive_data.values()) summary_points.append(f"๐Ÿ”“ {total_sensitive} sensitive data items found") if security.get('memory_patterns'): summary_points.append(f"๐Ÿ’พ {len(security['memory_patterns'])} memory leak patterns detected") if cloudflare.get('detected'): summary_points.append("๐Ÿ›ก๏ธ Cloudflare protection detected") for idx, point in enumerate(summary_points, 1): f.write(f"{idx}. {point}\n") f.write("\n" + "="*120 + "\n") f.write("๐Ÿ END OF COMPLETE CLOUDBLEED SCAN REPORT\n") f.write(f"๐Ÿ“… Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z')}\n") f.write("="*120 + "\n") print(f"\n๐Ÿ’พ COMPLETE report saved to: {filename}") print(f"๐Ÿ“„ File size: {os.path.getsize(filename):,} bytes") return filename class IntelligenceCache: """Simple caching system to avoid duplicate requests""" def __init__(self, cache_dir: str = ".cache"): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(exist_ok=True) self.db_path = self.cache_dir / "intel_cache.db" self.init_db() def init_db(self): """Initialize SQLite database""" conn = sqlite3.connect(str(self.db_path)) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS scan_cache ( url_hash TEXT PRIMARY KEY, url TEXT NOT NULL, data TEXT NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() conn.close() def get_cached_scan(self, url: str) -> Optional[Dict]: """Get cached scan results""" url_hash = hashlib.md5(url.encode()).hexdigest() conn = sqlite3.connect(str(self.db_path)) cursor = conn.cursor() cursor.execute( "SELECT data FROM scan_cache WHERE url_hash = ? AND timestamp > datetime('now', '-1 day')", (url_hash,) ) result = cursor.fetchone() conn.close() if result: return json.loads(result[0]) return None def cache_scan(self, url: str, data: Dict): """Cache scan results""" url_hash = hashlib.md5(url.encode()).hexdigest() conn = sqlite3.connect(str(self.db_path)) cursor = conn.cursor() cursor.execute( "REPLACE INTO scan_cache (url_hash, url, data) VALUES (?, ?, ?)", (url_hash, url, json.dumps(data, default=str)) ) conn.commit() conn.close() class AntiNoiseFilter: """Advanced anti-noise and false positive filter""" def __init__(self): self.js_false_positives = { 'password': [ r'password.*placeholder', r'password.*example', r'password.*test', r'password.*demo', r'type=.*password', r'input.*password', r'confirm.*password', r'new.*password', r'old.*password', r'change.*password' ], 'api_key': [ r'api.*key.*example', r'api.*key.*test', r'api.*key.*demo', r'your.*api.*key', r'insert.*api.*key', r'paste.*api.*key', r'sample.*api.*key' ], 'token': [ r'token.*example', r'token.*test', r'token.*demo', r'your.*token', r'sample.*token', r'paste.*token' ] } self.context_patterns = { 'high_confidence': [ r'[\"\']\s*:\s*[\"\']', r'=\s*[\"\']', r'const\s+\w+\s*=\s*[\"\']', r'let\s+\w+\s*=\s*[\"\']', r'var\s+\w+\s*=\s*[\"\']', r'process\.env\.', r'config\[[\"\']', r'\.get\([\"\']', ], 'low_confidence': [ r'placeholder=', r'example', r'sample', r'test', r'demo', r'changeme', r'your_.*here' ] } def filter_sensitive_data(self, category: str, value: str, context: str = "") -> bool: """Filter out false positives""" value_lower = value.lower() context_lower = context.lower() if any(fp in value_lower for fp in ['example', 'test', 'demo', 'placeholder', 'changeme']): return False if category in self.js_false_positives: for pattern in self.js_false_positives[category]: if re.search(pattern, context_lower, re.IGNORECASE): return False high_confidence = any( re.search(pattern, context_lower) for pattern in self.context_patterns['high_confidence'] ) low_confidence = any( re.search(pattern, context_lower) for pattern in self.context_patterns['low_confidence'] ) if category == 'api_keys': if not re.match(r'^[A-Za-z0-9_\-]{20,50}$', value): return False if len(value) < 20 or len(value) > 100: return False elif category == 'tokens': if value.startswith('eyJ'): return True if len(value) < 32: return False elif category == 'passwords': if len(value) < 8: return False if any(x in context_lower for x in ['var ', 'const ', 'let ', 'function']): return False if low_confidence and not high_confidence: return False return True class CompleteRegexPatterns: """Enhanced regex patterns for COMPLETE data capture""" def __init__(self): self.patterns = { 'api_keys': [ r'(?i)(?:aws)?_?(?:access)?_?key["\']?\s*[:=]\s*["\']?(AKIA[0-9A-Z]{16,})["\']?', r'(?i)(?:aws)?_?(?:secret)?_?key["\']?\s*[:=]\s*["\']?([A-Za-z0-9/+]{40,})["\']?', r'(?i)(?:stripe)?_?(?:api)?_?key["\']?\s*[:=]\s*["\']?(sk_(?:live|test)_[0-9a-zA-Z]{24,})["\']?', r'(?i)(?:github)?_?(?:token)?["\']?\s*[:=]\s*["\']?(gh[ps]_[a-zA-Z0-9]{36,})["\']?', r'(?i)["\']?(?:api[_-]?key|apikey)["\']?\s*[:=]\s*["\']?([a-fA-F0-9]{32,128})["\']?', r'(?i)["\']?(?:secret[_-]?key)["\']?\s*[:=]\s*["\']?([a-fA-F0-9]{32,128})["\']?', r'(?i)["\']?(?:private[_-]?key)["\']?\s*[:=]\s*["\']?(\-{5}BEGIN[\s\S]{100,}END[\s\S]+\-{5})["\']?', ], 'tokens': [ r'(?i)["\']?(?:bearer[_-]?token|jwt[_-]?token)["\']?\s*[:=]\s*["\']?(eyJ[a-zA-Z0-9_-]{10,}\.[a-zA-Z0-9_-]{10,}\.[a-zA-Z0-9_-]{10,})["\']?', r'(?i)["\']?authorization["\']?\s*[:=]\s*["\']?Bearer\s+([a-zA-Z0-9_-]{20,}\.[a-zA-Z0-9_-]{20,}\.[a-zA-Z0-9_-]{20,})["\']?', r'(?i)["\']?(?:access[_-]?token)["\']?\s*[:=]\s*["\']?([a-fA-F0-9]{32,512})["\']?', r'(?i)["\']?(?:session[_-]?(?:id|token))["\']?\s*[:=]\s*["\']?([a-fA-F0-9]{32,256})["\']?', r'(?i)["\']?(?:csrf[_-]?token)["\']?\s*[:=]\s*["\']?([a-fA-F0-9]{32,128})["\']?', r'(?i)["\']?(?:refresh[_-]?token)["\']?\s*[:=]\s*["\']?([a-fA-F0-9]{32,256})["\']?', ], 'credentials': [ r'(?i)["\']?(?:db[_-]?(?:pass|password))["\']?\s*[:=]\s*["\']?([^"\'\s]{6,100})["\']?', r'(?i)["\']?(?:database[_-]?(?:pass|password))["\']?\s*[:=]\s*["\']?([^"\'\s]{6,100})["\']?', r'(?i)["\']?(?:admin[_-]?(?:pass|password))["\']?\s*[:=]\s*["\']?([^"\'\s]{6,100})["\']?', r'(?i)["\']?(?:root[_-]?(?:pass|password))["\']?\s*[:=]\s*["\']?([^"\'\s]{6,100})["\']?', r'(?i)["\']?(?:mysql[_-]?(?:pass|password))["\']?\s*[:=]\s*["\']?([^"\'\s]{6,100})["\']?', r'(?i)["\']?(?:postgres[_-]?(?:pass|password))["\']?\s*[:=]\s*["\']?([^"\'\s]{6,100})["\']?', r'(?i)["\']?(?:mongodb[_-]?(?:pass|password))["\']?\s*[:=]\s*["\']?([^"\'\s]{6,100})["\']?', ], 'cloudflare_indicators': [ r'(?i)["\']?__cfduid["\']?\s*[:=]\s*["\']?([a-fA-F0-9]{43})["\']?', r'(?i)["\']?cf_clearance["\']?\s*[:=]\s*["\']?([a-fA-F0-9_-]{40,})["\']?', r'CF-Ray\s*:\s*([a-fA-F0-9]{16}-[A-Z]{3})', r'(?i)cf-cache-status', r'(?i)cf-polished', r'(?i)cf-bgj', ], 'memory_leak_patterns': [ r'[0-9a-fA-F]{32,}', # Long hex strings r'(?s)\x00{4,}', # Null byte sequences r'[^\x20-\x7E]{20,}', # Non-printable sequences r'[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}', # UUIDs r'(?:[0-9a-fA-F]{2}[:\-\s]?){16,}', # MAC addresses or similar r'0x[0-9a-fA-F]{8,16}', # Memory addresses r'[0-9a-fA-F]{16,}', # General hex dumps ], 'ioc_patterns': [ r'\b(?:10\.\d{1,3}\.\d{1,3}\.\d{1,3}|172\.(?:1[6-9]|2[0-9]|3[0-1])\.\d{1,3}\.\d{1,3}|192\.168\.\d{1,3}\.\d{1,3})\b', r'(?i)(?:union\s+select|sleep\(\d+\)|benchmark\(|exec\(|system\(|drop\s+table|insert\s+into)', ], 'emails': [ r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', ], 'phone_numbers': [ r'\b(?:\+?\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b', ] } self.compiled_patterns = {} for category, pattern_list in self.patterns.items(): self.compiled_patterns[category] = [ re.compile(pattern, re.IGNORECASE) for pattern in pattern_list ] class CompleteFingerprintAnalyzer: """Complete fingerprinting analyzer""" def __init__(self): self.cdn_waf_fingerprints = { 'cloudflare': { 'patterns': ['cloudflare', '__cfduid', 'cf-ray', 'cf-cache-status', 'cf-polished', 'cf-bgj'], 'cdn': 'Cloudflare', 'waf': 'Cloudflare WAF', 'risk_score': 0.3, 'cloudbleed_risk': 0.8 }, 'akamai': { 'patterns': ['akamai', 'x-akamai', 'akamaighost', 'x-akamai-transformed'], 'cdn': 'Akamai', 'waf': 'Akamai Kona', 'risk_score': 0.2, 'cloudbleed_risk': 0.1 }, 'sucuri': { 'patterns': ['sucuri', 'x-sucuri-id', 'x-sucuri-cache', 'sucuri/cloudproxy'], 'cdn': 'Sucuri', 'waf': 'Sucuri WAF', 'risk_score': 0.4, 'cloudbleed_risk': 0.3 }, 'fastly': { 'patterns': ['fastly', 'x-fastly', 'surrogate-key'], 'cdn': 'Fastly', 'waf': 'Fastly WAF', 'risk_score': 0.2, 'cloudbleed_risk': 0.2 } } self.language_fingerprints = { 'php': { 'headers': ['x-powered-by: php', 'server: php', 'x-php-version'], 'patterns': [r'\.php\b', r'\?php', r'php_\w+', r'PHP Version'], }, 'asp.net': { 'headers': ['x-powered-by: asp.net', 'x-aspnet-version', 'server: microsoft-iis', 'x-aspnetmvc-version'], 'patterns': [r'\.aspx\b', r'\.ashx\b', r'__doPostBack', r'ViewState'], }, 'node.js': { 'headers': ['x-powered-by: express', 'server: node', 'x-node-version'], 'patterns': [r'node\.js', r'require\(', r'module\.exports', r'process\.env'], }, 'python': { 'headers': ['x-powered-by: python', 'server: gunicorn', 'server: uwsgi', 'x-python-version'], 'patterns': [r'def\s+\w+\(', r'import\s+\w+', r'from\s+\w+', r'__pycache__'], }, 'java': { 'headers': ['x-powered-by: jsp', 'server: tomcat', 'server: jetty', 'x-java-version'], 'patterns': [r'\.jsp\b', r'\.do\b', r'javax\.servlet', r'java\.'], }, } self.framework_fingerprints = { 'laravel': { 'patterns': ['laravel', 'csrf-token', 'mix-manifest.json', 'App\\Http'], 'headers': ['x-powered-by: laravel'], }, 'django': { 'patterns': ['django', 'csrfmiddlewaretoken', 'settings.py', 'wsgi.py'], 'headers': ['x-powered-by: django'], }, 'wordpress': { 'patterns': ['wordpress', 'wp-content', 'wp-includes', 'wp-json', 'wp-admin'], 'headers': ['x-powered-by: wordpress'], }, 'react': { 'patterns': ['react', 'react-dom', '__NEXT_DATA__', 'webpack'], 'headers': [], }, 'vue.js': { 'patterns': ['vue', 'vue-router', 'vuex', 'nuxt'], 'headers': [], }, } def analyze(self, headers: Dict, content: str, url: str) -> Dict: """Comprehensive fingerprint analysis with complete data""" fingerprint = { 'cdn': None, 'waf': None, 'language': None, 'framework': None, 'server_software': None, 'technologies': [], 'risk_score': 0.0, 'cloudbleed_risk': 0.0, 'header_details': {}, 'content_indicators': [] } headers_lower = {k.lower(): v.lower() for k, v in headers.items()} content_lower = content.lower() # CDN/WAF Detection for service, data in self.cdn_waf_fingerprints.items(): for pattern in data['patterns']: pattern_lower = pattern.lower() # Check headers for header_name, header_value in headers_lower.items(): if pattern_lower in header_name or pattern_lower in header_value: fingerprint['cdn'] = data['cdn'] fingerprint['waf'] = data['waf'] fingerprint['risk_score'] += data['risk_score'] fingerprint['cloudbleed_risk'] += data['cloudbleed_risk'] fingerprint['header_details'][f'cdn_waf_{service}'] = { 'header': header_name, 'value': header_value, 'pattern': pattern } break # Check content if pattern_lower in content_lower: fingerprint['cdn'] = data['cdn'] fingerprint['waf'] = data['waf'] fingerprint['risk_score'] += data['risk_score'] fingerprint['cloudbleed_risk'] += data['cloudbleed_risk'] fingerprint['content_indicators'].append(f"Content contains '{pattern}'") # Server Software for header_name, header_value in headers.items(): if 'server' in header_name.lower(): fingerprint['server_software'] = header_value fingerprint['header_details']['server'] = { 'header': header_name, 'value': header_value } # Detailed server analysis server_lower = header_value.lower() if 'nginx' in server_lower: fingerprint['technologies'].append('nginx') version_match = re.search(r'nginx/(\d+\.\d+(?:\.\d+)?)', server_lower) if version_match: fingerprint['header_details']['server']['version'] = version_match.group(1) elif 'apache' in server_lower: fingerprint['technologies'].append('apache') version_match = re.search(r'apache/(\d+\.\d+(?:\.\d+)?)', server_lower) if version_match: fingerprint['header_details']['server']['version'] = version_match.group(1) elif 'iis' in server_lower or 'microsoft' in server_lower: fingerprint['technologies'].append('iis') elif 'cloudflare' in server_lower: fingerprint['technologies'].append('cloudflare') elif 'gunicorn' in server_lower: fingerprint['technologies'].append('gunicorn') elif 'tomcat' in server_lower: fingerprint['technologies'].append('tomcat') # Programming Language Detection for lang, data in self.language_fingerprints.items(): detected = False # Check headers for header_pattern in data['headers']: header_key, header_value = header_pattern.split(': ', 1) if ': ' in header_pattern else (header_pattern, '') for header_name, actual_value in headers_lower.items(): if header_key.lower() in header_name and header_value in actual_value: fingerprint['language'] = lang fingerprint['technologies'].append(lang) detected = True fingerprint['header_details'][f'language_{lang}'] = { 'header': header_name, 'value': actual_value } break if detected: break # Check content patterns if not detected: for pattern in data['patterns']: if re.search(pattern, content_lower, re.IGNORECASE): fingerprint['language'] = lang fingerprint['technologies'].append(lang) fingerprint['content_indicators'].append(f"Language pattern: {pattern}") break # Framework Detection for framework, data in self.framework_fingerprints.items(): detected = False # Check headers for header_pattern in data['headers']: if ': ' in header_pattern: header_key, header_value = header_pattern.split(': ', 1) for header_name, actual_value in headers_lower.items(): if header_key.lower() in header_name and header_value in actual_value: fingerprint['framework'] = framework fingerprint['technologies'].append(framework) detected = True break if detected: break # Check content patterns if not detected: for pattern in data['patterns']: if pattern.lower() in content_lower: fingerprint['framework'] = framework fingerprint['technologies'].append(framework) fingerprint['content_indicators'].append(f"Framework pattern: {pattern}") break # Remove duplicates and sort fingerprint['technologies'] = sorted(list(set(fingerprint['technologies']))) # Calculate risk scores fingerprint['cloudbleed_risk'] = min(fingerprint['cloudbleed_risk'], 1.0) fingerprint['risk_score'] = min(fingerprint['risk_score'], 1.0) return fingerprint class CompleteIntelligenceScorer: """Complete intelligence scoring with MITRE ATT&CK mapping""" def __init__(self): self.mitre_tactics = [ MITRETactic( id="TA0043", name="Reconnaissance", techniques=["T1595", "T1592", "T1589"], confidence=0.7 ), MITRETactic( id="TA0009", name="Collection", techniques=["T1213", "T1005", "T1114"], confidence=0.8 ), MITRETactic( id="TA0010", name="Exfiltration", techniques=["T1041", "T1020", "T1030"], confidence=0.6 ), ] self.ioc_weights = { 'critical': { 'api_keys': 0.95, 'database_credentials': 0.85, 'memory_leak': 0.98, 'cloudflare_leak': 0.92, 'jwt_tokens': 0.88, 'private_keys': 0.96 }, 'suspicious': { 'internal_ips': 0.65, 'suspicious_patterns': 0.55, 'missing_security_headers': 0.45, 'exposed_technologies': 0.35, 'emails': 0.25, 'phone_numbers': 0.20 }, 'low_risk': { 'contact_info': 0.15, 'general_patterns': 0.25, 'info_disclosure': 0.20, 'version_exposure': 0.30 } } def calculate_ioc_score(self, findings: Dict, fingerprint: Dict) -> Tuple[float, IOCClassification, List[MITRETactic]]: """Calculate comprehensive intelligence score with complete analysis""" ioc_classification = IOCClassification([], [], []) matched_tactics = [] total_score = 0.0 # Critical IOCs critical_score = 0.0 critical_items = [] if findings.get('sensitive_data'): for category, items in findings['sensitive_data'].items(): if category in self.ioc_weights['critical']: weight = self.ioc_weights['critical'][category] item_count = len(items) critical_score += weight * min(item_count / 5, 1.0) for item in items[:10]: # First 10 items if isinstance(item, dict): value = item.get('value', 'N/A') confidence = item.get('confidence', 0) critical_items.append(f"{category} ({confidence:.0%}): {value}") else: critical_items.append(f"{category}: {str(item)}") # Add all critical items to classification ioc_classification.critical = critical_items if findings.get('security', {}).get('risk_level') == 'high': critical_score += 0.75 ioc_classification.critical.append("HIGH SECURITY RISK CONFIGURATION") # Suspicious IOCs suspicious_score = 0.0 suspicious_items = [] if fingerprint.get('risk_score', 0) > 0.5: suspicious_score += 0.45 suspicious_items.append(f"High-risk infrastructure fingerprint (Score: {fingerprint['risk_score']:.2f})") if findings.get('headers', {}).get('missing_headers'): missing_count = len(findings['headers']['missing_headers']) suspicious_score += min(missing_count * 0.12, 0.6) suspicious_items.append(f"Missing {missing_count} critical security headers") if fingerprint.get('header_details', {}).get('server', {}).get('version'): suspicious_score += 0.25 suspicious_items.append(f"Server version exposed: {fingerprint['header_details']['server']['version']}") # Add all suspicious items ioc_classification.suspicious = suspicious_items # Cloudflare-specific leak risk cloudflare_leak_score = 0.0 if fingerprint.get('cdn') == 'Cloudflare': if findings.get('sensitive_data'): cloudflare_leak_score += 0.85 ioc_classification.critical.append("CLOUDFLARE WITH SENSITIVE DATA EXPOSURE - POTENTIAL CLOUDBLEED") memory_patterns = findings.get('security', {}).get('memory_patterns', []) if memory_patterns: cloudflare_leak_score += 0.95 ioc_classification.critical.append(f"POTENTIAL CLOUDBLEED MEMORY LEAK PATTERNS DETECTED ({len(memory_patterns)} patterns)") cloudflare_leak_score += fingerprint.get('cloudbleed_risk', 0) * 0.5 # MITRE Tactic Mapping if critical_score > 0.6: matched_tactics.append(self.mitre_tactics[1]) # Collection matched_tactics.append(self.mitre_tactics[2]) # Exfiltration if suspicious_score > 0.4: matched_tactics.append(self.mitre_tactics[0]) # Reconnaissance if cloudflare_leak_score > 0.5: matched_tactics.append(self.mitre_tactics[1]) # Collection # Calculate total score total_score = ( critical_score * 0.55 + suspicious_score * 0.30 + cloudflare_leak_score * 0.45 ) total_score = min(total_score, 1.0) return total_score, ioc_classification, matched_tactics class CompleteCloudbleedScanner: """Complete Cloudbleed Scanner - Shows ALL data with NO truncation""" def __init__(self, enable_cache: bool = True, enable_intelligence: bool = True): self.enable_cache = enable_cache self.enable_intelligence = enable_intelligence self.cache = IntelligenceCache() if enable_cache else None self.filter = AntiNoiseFilter() self.regex = CompleteRegexPatterns() self.fingerprint_analyzer = CompleteFingerprintAnalyzer() self.intelligence_scorer = CompleteIntelligenceScorer() if enable_intelligence else None self.report_saver = CompleteReportSaver() self.session_timeout = aiohttp.ClientTimeout(total=30) self.scan_headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', 'DNT': '1' } async def scan_url(self, url: str) -> Dict: """Scan URL for Cloudbleed patterns and sensitive data - COMPLETE analysis""" if self.enable_cache: cached = self.cache.get_cached_scan(url) if cached: logger.info(f"Using cached results for {url}") return cached print(f"\n๐Ÿ” ๐Ÿ” ๐Ÿ” Scanning: {url}") print(f"โฐ Start time: {datetime.now().strftime('%H:%M:%S')}") result = { 'url': url, 'timestamp': datetime.now().isoformat(), 'success': False, 'error': None, 'findings': {}, 'intelligence': {}, 'fingerprint': {}, 'content_hash': None } try: connector = aiohttp.TCPConnector(ssl=ssl_context) async with aiohttp.ClientSession( connector=connector, timeout=self.session_timeout, headers=self.scan_headers ) as session: async with session.get(url, allow_redirects=True, ssl=False) as response: content = await response.text() # Calculate content hash result['content_hash'] = hashlib.md5(content.encode()).hexdigest() # Basic information result['status'] = response.status result['content_length'] = len(content) result['content_type'] = response.headers.get('Content-Type', '') result['server'] = response.headers.get('Server', 'Unknown') result['final_url'] = str(response.url) # Store ALL headers all_headers = dict(response.headers) result['all_headers'] = all_headers # Advanced Fingerprinting - COMPLETE fingerprint = self.fingerprint_analyzer.analyze(all_headers, content, url) result['fingerprint'] = fingerprint # Enhanced Content Analysis - COMPLETE sensitive_findings = self.analyze_content_complete(content) if sensitive_findings: result['findings']['sensitive_data'] = sensitive_findings # Header Analysis - COMPLETE header_analysis = self.analyze_headers_complete(all_headers) if header_analysis: result['findings']['headers'] = header_analysis # Cloudflare Detection - COMPLETE cf_detected = await self.detect_cloudflare_complete(response, content) if cf_detected: result['findings']['cloudflare'] = cf_detected # Security Analysis - COMPLETE security_analysis = await self.security_analysis_complete(response, content, fingerprint) if security_analysis: result['findings']['security'] = security_analysis # Intelligence Enrichment - COMPLETE if self.enable_intelligence: intelligence_data = await self.enrich_intelligence_complete(url, response, content, fingerprint, result['findings']) result['intelligence'] = intelligence_data result['success'] = True if self.enable_cache: self.cache.cache_scan(url, result) print(f"โœ… Scan completed: {url}") print(f"๐Ÿ“Š Content size: {result['content_length']:,} bytes") return result except asyncio.TimeoutError: result['error'] = "Request timeout (30 seconds)" return result except aiohttp.ClientError as e: result['error'] = f"Client error: {str(e)}" return result except Exception as e: result['error'] = f"Unexpected error: {str(e)}" logger.exception(f"Error scanning {url}") return result def analyze_content_complete(self, content: str) -> Dict: """Complete content analysis with ALL data - NO truncation""" findings = {} for category, compiled_patterns in self.regex.compiled_patterns.items(): category_matches = [] for pattern in compiled_patterns: # Find ALL matches matches = pattern.finditer(content) for match in matches: if match.group(0): match_text = match.group(0) # Get COMPLETE context (500 chars before and after) start_pos = max(0, match.start() - 500) end_pos = min(len(content), match.end() + 500) context = content[start_pos:end_pos] # Apply anti-noise filtering if self.filter.filter_sensitive_data(category, match_text, context): # Clean the match clean_match = match_text.strip() if len(clean_match) > 3: confidence = self.calculate_confidence_complete(category, clean_match, context) # Store COMPLETE match category_matches.append({ 'value': clean_match, 'context': context, 'confidence': confidence, 'position': match.start(), 'length': len(clean_match), 'hex_representation': clean_match.encode('utf-8', errors='ignore').hex()[:200] }) if category_matches: # Sort by confidence and length category_matches.sort(key=lambda x: (x['confidence'], x['length']), reverse=True) findings[category] = category_matches # ALL matches, no limit return findings def calculate_confidence_complete(self, category: str, value: str, context: str) -> float: """Calculate confidence score with complete analysis""" confidence = 0.5 # Base confidence # Value characteristics if category == 'api_keys': if re.match(r'^AKIA[0-9A-Z]{16}$', value): confidence = 0.98 # AWS Access Key elif re.match(r'^sk_(live|test)_[0-9a-zA-Z]{24}$', value): confidence = 0.95 # Stripe Secret Key elif re.match(r'^gh[ps]_[a-zA-Z0-9]{36,}$', value): confidence = 0.93 # GitHub Token elif len(value) >= 32 and re.match(r'^[a-fA-F0-9]+$', value): confidence = 0.85 elif '-----BEGIN' in value and '-----END' in value: confidence = 0.96 # Private key elif category == 'tokens': if value.startswith('eyJ'): confidence = 0.94 # JWT token # Additional JWT validation parts = value.split('.') if len(parts) == 3: confidence += 0.03 elif len(value) >= 64: confidence = 0.75 elif category == 'credentials': if len(value) >= 12: confidence += 0.15 if re.search(r'[A-Z]', value) and re.search(r'[a-z]', value): confidence += 0.10 if re.search(r'\d', value): confidence += 0.05 if re.search(r'[^A-Za-z0-9]', value): confidence += 0.05 # Context indicators context_lower = context.lower() high_conf_indicators = { 'secret': 0.15, 'key': 0.12, 'token': 0.12, 'password': 0.15, 'credential': 0.10, 'private': 0.10, 'auth': 0.08, 'api': 0.07 } for indicator, boost in high_conf_indicators.items(): if indicator in context_lower: confidence += boost # Negative indicators (reduce confidence) low_conf_indicators = ['example', 'sample', 'test', 'demo', 'placeholder'] for indicator in low_conf_indicators: if indicator in context_lower: confidence *= 0.7 return min(max(confidence, 0.0), 1.0) def analyze_headers_complete(self, headers: Dict) -> Dict: """Complete header analysis with ALL details""" analysis = { 'security_headers': {}, 'missing_headers': [], 'server_info': {}, 'vulnerabilities': [], 'all_headers': [], 'cookie_analysis': [] } # Store ALL headers analysis['all_headers'] = [f"{k}: {v}" for k, v in headers.items()] # Security Headers Configuration security_headers_config = { 'Strict-Transport-Security': { 'required': True, 'risk': 'critical', 'description': 'Prevents SSL stripping and protocol downgrade attacks', 'recommended_value': 'max-age=31536000; includeSubDomains; preload' }, 'Content-Security-Policy': { 'required': True, 'risk': 'critical', 'description': 'Prevents XSS, clickjacking, and code injection attacks', 'recommended_value': "default-src 'self'; script-src 'self'" }, 'X-Frame-Options': { 'required': True, 'risk': 'high', 'description': 'Prevents clickjacking attacks', 'recommended_value': 'DENY or SAMEORIGIN' }, 'X-Content-Type-Options': { 'required': True, 'risk': 'medium', 'description': 'Prevents MIME type sniffing', 'recommended_value': 'nosniff' }, 'Referrer-Policy': { 'required': False, 'risk': 'medium', 'description': 'Controls referrer information leakage', 'recommended_value': 'strict-origin-when-cross-origin' }, 'Permissions-Policy': { 'required': False, 'risk': 'medium', 'description': 'Controls browser features and APIs', 'recommended_value': 'See latest best practices' }, 'X-XSS-Protection': { 'required': False, 'risk': 'low', 'description': 'Legacy XSS protection (deprecated)', 'recommended_value': '0 (disable as CSP is better)' } } # Analyze each security header for header, config in security_headers_config.items(): if header in headers: analysis['security_headers'][header] = { 'value': headers[header], 'risk': config['risk'], 'description': config['description'], 'recommended': config['recommended_value'] } # Check for common misconfigurations if header == 'Strict-Transport-Security': if 'max-age' not in headers[header]: analysis['vulnerabilities'].append(f"HSTS missing max-age directive") if 'includeSubDomains' not in headers[header]: analysis['vulnerabilities'].append(f"HSTS missing includeSubDomains directive") elif header == 'Content-Security-Policy': if "'unsafe-inline'" in headers[header]: analysis['vulnerabilities'].append(f"CSP contains unsafe-inline directive") if "'unsafe-eval'" in headers[header]: analysis['vulnerabilities'].append(f"CSP contains unsafe-eval directive") elif header == 'X-Frame-Options': if headers[header].upper() not in ['DENY', 'SAMEORIGIN']: analysis['vulnerabilities'].append(f"X-Frame-Options has non-standard value: {headers[header]}") elif config['required']: analysis['missing_headers'].append(header) analysis['vulnerabilities'].append( f"Missing {header}: {config['description']}" ) # Server Information with COMPLETE analysis for header_name, header_value in headers.items(): if 'server' in header_name.lower(): analysis['server_info']['header'] = header_name analysis['server_info']['value'] = header_value # Extract ALL version information version_patterns = [ r'(\d+\.\d+(?:\.\d+)?(?:\.\d+)?)', # Standard version r'v(\d+(?:\.\d+)?)', # vX or vX.Y format r'(\d{8})', # Date format (YYYYMMDD) r'(\d{4}[a-z]?)', # Year + optional letter r'(\d{1,2}/\d{1,2}/\d{4})', # Date format ] found_versions = [] for pattern in version_patterns: matches = re.findall(pattern, header_value) found_versions.extend(matches) if found_versions: analysis['server_info']['versions'] = found_versions for version in found_versions: if isinstance(version, tuple): version = version[0] analysis['vulnerabilities'].append( f"Server version exposed: {version}" ) # Cookie Analysis set_cookie_header = headers.get('Set-Cookie', '') if set_cookie_header: cookies = set_cookie_header.split(', ') for cookie in cookies: cookie_analysis = { 'raw': cookie[:200], 'secure': 'Secure' in cookie, 'httponly': 'HttpOnly' in cookie, 'samesite': 'SameSite' in cookie, 'path': None, 'domain': None } # Extract path and domain path_match = re.search(r'path=([^;]+)', cookie, re.IGNORECASE) if path_match: cookie_analysis['path'] = path_match.group(1) domain_match = re.search(r'domain=([^;]+)', cookie, re.IGNORECASE) if domain_match: cookie_analysis['domain'] = domain_match.group(1) analysis['cookie_analysis'].append(cookie_analysis) # Check for insecure cookies if not cookie_analysis['secure']: analysis['vulnerabilities'].append("Cookie missing Secure flag") if not cookie_analysis['httponly']: analysis['vulnerabilities'].append("Cookie missing HttpOnly flag") return analysis async def detect_cloudflare_complete(self, response, content: str) -> Dict: """Complete Cloudflare detection with ALL indicators""" indicators = [] headers_dict = dict(response.headers) # Cloudflare-specific patterns cloudflare_patterns = [ 'cloudflare', '__cfduid', 'cf-ray', 'cf-cache-status', 'cf-polished', 'cf-bgj', 'cf-request-id', 'cf-worker', 'cf-connecting-ip' ] # Check ALL headers for header_name, header_value in headers_dict.items(): header_line = f"{header_name}: {header_value}" header_lower = header_line.lower() for pattern in cloudflare_patterns: if pattern in header_lower: indicators.append({ 'type': 'header', 'pattern': pattern, 'value': header_line }) # Check cookies cookies = headers_dict.get('Set-Cookie', '') if cookies: for pattern in ['__cfduid', 'cf_clearance']: if pattern in cookies: indicators.append({ 'type': 'cookie', 'pattern': pattern, 'value': cookies[:500] + ('...' if len(cookies) > 500 else '') }) # Check content for Cloudflare-specific patterns content_lower = content.lower() content_indicators = [] for pattern in cloudflare_patterns: if pattern in content_lower: # Find all occurrences positions = [m.start() for m in re.finditer(pattern, content_lower)] for pos in positions[:5]: # First 5 occurrences start = max(0, pos - 50) end = min(len(content), pos + 50) context = content[start:end] content_indicators.append(f"'{pattern}' at position {pos}: ...{context}...") if content_indicators: indicators.append({ 'type': 'content', 'patterns': content_indicators[:10] # First 10 content indicators }) # Calculate confidence confidence = min(len(indicators) * 0.25, 1.0) return { 'detected': len(indicators) > 0, 'indicators': indicators, 'confidence': confidence, 'indicator_count': len(indicators) } async def security_analysis_complete(self, response, content: str, fingerprint: Dict) -> Dict: """Complete security analysis with ALL memory leak patterns""" analysis = { 'risk_level': 'low', 'risk_score': 0.0, 'issues': [], 'recommendations': [], 'memory_patterns': [], 'mitre_tactics': [], 'pattern_statistics': {} } # HTTPS Check if str(response.url).startswith('http:'): analysis['issues'].append("โŒ Site not using HTTPS - data transmitted in plain text") analysis['risk_score'] += 0.35 # Missing Security Headers - COMPLETE analysis headers_dict = dict(response.headers) missing_critical = [] critical_headers = ['Strict-Transport-Security', 'Content-Security-Policy', 'X-Frame-Options'] for header in critical_headers: if header not in headers_dict: missing_critical.append(header) if missing_critical: analysis['issues'].append(f"โŒ Missing critical security headers: {', '.join(missing_critical)}") analysis['risk_score'] += len(missing_critical) * 0.15 # Server Information Exposure - COMPLETE server_header = headers_dict.get('Server', '') if server_header: # Find ALL version patterns version_patterns = [ r'\d+\.\d+(?:\.\d+)?(?:\.\d+)?', r'v\d+(?:\.\d+)?', r'\d{8}', r'\d{4}[a-z]?' ] exposed_versions = [] for pattern in version_patterns: matches = re.findall(pattern, server_header) exposed_versions.extend(matches) if exposed_versions: analysis['issues'].append(f"โš ๏ธ Server version exposed: {server_header}") analysis['risk_score'] += min(len(exposed_versions) * 0.08, 0.25) # Memory Leak Patterns - COMPLETE analysis memory_patterns = self.regex.compiled_patterns['memory_leak_patterns'] all_memory_matches = [] pattern_statistics = { 'hex_strings': 0, 'null_sequences': 0, 'non_printable': 0, 'uuids': 0, 'memory_addresses': 0, 'total_patterns': 0 } for pattern_idx, pattern in enumerate(memory_patterns): pattern_matches = list(pattern.finditer(content)) for match in pattern_matches: match_text = match.group(0) match_start = match.start() match_end = match.end() # Determine pattern type if re.match(r'[0-9a-fA-F]{32,}', match_text): pattern_type = 'hex_string' pattern_statistics['hex_strings'] += 1 elif re.match(r'(?s)\x00{4,}', match_text): pattern_type = 'null_sequence' pattern_statistics['null_sequences'] += 1 elif re.match(r'[^\x20-\x7E]{20,}', match_text): pattern_type = 'non_printable' pattern_statistics['non_printable'] += 1 elif re.match(r'[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}', match_text): pattern_type = 'uuid' pattern_statistics['uuids'] += 1 elif re.match(r'0x[0-9a-fA-F]{8,16}', match_text): pattern_type = 'memory_address' pattern_statistics['memory_addresses'] += 1 else: pattern_type = 'unknown' # Get context context_start = max(0, match_start - 200) context_end = min(len(content), match_end + 200) context = content[context_start:context_end] # Store COMPLETE pattern all_memory_matches.append({ 'pattern': match_text, 'type': pattern_type, 'length': len(match_text), 'position': match_start, 'context': context, 'hex_representation': match_text.encode('utf-8', errors='ignore').hex(), 'risk_score': min(len(match_text) / 1000, 0.8) }) pattern_statistics['total_patterns'] += 1 # Update analysis with statistics analysis['pattern_statistics'] = pattern_statistics # Sort patterns by length (longer = more suspicious) all_memory_matches.sort(key=lambda x: x['length'], reverse=True) # Add ALL patterns to analysis for match in all_memory_matches: analysis['memory_patterns'].append(match) analysis['risk_score'] += match['risk_score'] # Cloudflare-specific risks - COMPLETE if fingerprint.get('cdn') == 'Cloudflare': analysis['issues'].append("๐Ÿ›ก๏ธ Cloudflare detected - potential Cloudbleed scenario") analysis['risk_score'] += 0.2 if analysis.get('memory_patterns'): pattern_count = len(analysis['memory_patterns']) analysis['issues'].append(f"๐Ÿšจ {pattern_count} potential Cloudbleed memory leak patterns detected") analysis['risk_score'] += min(pattern_count * 0.1, 0.5) analysis['mitre_tactics'].append("TA0009 - Collection (Cloudbleed)") if str(response.url).startswith('http:'): analysis['issues'].append("โš ๏ธ Cloudflare without HTTPS - potential downgrade attacks") analysis['risk_score'] += 0.25 # Determine risk level based on COMPLETE score if analysis['risk_score'] >= 0.75: analysis['risk_level'] = 'critical' elif analysis['risk_score'] >= 0.5: analysis['risk_level'] = 'high' elif analysis['risk_score'] >= 0.3: analysis['risk_level'] = 'medium' else: analysis['risk_level'] = 'low' # Generate COMPLETE recommendations if analysis['risk_score'] > 0.6: analysis['recommendations'].append("๐Ÿ”ด IMMEDIATE ACTION REQUIRED: Investigate potential Cloudbleed memory leaks") analysis['recommendations'].append("๐Ÿ”ด Contact Cloudflare support and security team immediately") if analysis.get('memory_patterns'): analysis['recommendations'].append("๐Ÿ” Investigate ALL memory leak patterns found in the report") analysis['recommendations'].append("๐Ÿ”„ Rotate ALL API keys, tokens, and credentials immediately") if fingerprint.get('cdn') == 'Cloudflare': analysis['recommendations'].append("๐Ÿ›ก๏ธ Review Cloudflare configuration for potential memory leak issues") analysis['recommendations'].append("๐Ÿ“Š Enable Cloudflare logging and monitoring for suspicious activity") if missing_critical: analysis['recommendations'].append("๐Ÿ”ง Implement missing security headers immediately") analysis['recommendations'].append("๐Ÿ“– Follow OWASP security header guidelines") return analysis async def enrich_intelligence_complete(self, url: str, response, content: str, fingerprint: Dict, findings: Dict) -> Dict: """Complete intelligence enrichment""" intelligence = { 'ioc_score': 0.0, 'ioc_classification': {}, 'mitre_tactics': [], 'threat_level': 'low', 'enrichment_data': {}, 'timestamp': datetime.now().isoformat() } if self.intelligence_scorer: score, classification, tactics = self.intelligence_scorer.calculate_ioc_score( findings, fingerprint ) intelligence['ioc_score'] = score intelligence['ioc_classification'] = { 'critical': classification.critical, 'suspicious': classification.suspicious, 'low_risk': classification.low_risk } intelligence['mitre_tactics'] = [ { 'id': tactic.id, 'name': tactic.name, 'confidence': tactic.confidence, 'techniques': tactic.techniques } for tactic in tactics ] # Determine COMPLETE threat level if score >= 0.8: intelligence['threat_level'] = 'critical' elif score >= 0.6: intelligence['threat_level'] = 'high' elif score >= 0.4: intelligence['threat_level'] = 'medium' elif score >= 0.2: intelligence['threat_level'] = 'low' else: intelligence['threat_level'] = 'informational' parsed_url = urlparse(url) domain = parsed_url.netloc intelligence['enrichment_data']['domain_analysis'] = { 'domain': domain, 'tld': domain.split('.')[-1] if '.' in domain else '', 'subdomain_count': len(domain.split('.')) - 2 if '.' in domain else 0, 'url_structure': { 'scheme': parsed_url.scheme, 'netloc': parsed_url.netloc, 'path': parsed_url.path, 'params': parsed_url.params, 'query': parsed_url.query, 'fragment': parsed_url.fragment } } # Content statistics intelligence['enrichment_data']['content_stats'] = { 'size_bytes': len(content), 'line_count': content.count('\n'), 'word_count': len(content.split()), 'character_count': len(content), 'binary_percentage': sum(1 for c in content if ord(c) < 32 or ord(c) > 126) / len(content) * 100 if content else 0 } return intelligence def display_result_complete(self, result: Dict): """Display COMPLETE results with NO truncation""" print("\n" + "="*120) print(f"๐Ÿšจ ๐Ÿšจ ๐Ÿšจ CLOUDBLEED COMPLETE SCAN REPORT ๐Ÿšจ ๐Ÿšจ ๐Ÿšจ") print(f"๐ŸŒ URL: {result['url']}") print("="*120) if result['error']: print(f"โŒ โŒ โŒ SCAN ERROR โŒ โŒ โŒ") print(f"Error: {result['error']}") print("="*120) return # Basic Info - COMPLETE print(f"\n๐Ÿ“Š ๐Ÿ“Š ๐Ÿ“Š BASIC INFORMATION ๐Ÿ“Š ๐Ÿ“Š ๐Ÿ“Š") print(f" โœ… Status Code: {result.get('status', 'N/A')}") print(f" ๐Ÿ“ Content Size: {result.get('content_length', 0):,} bytes") print(f" ๐Ÿ“„ Content Type: {result.get('content_type', 'Unknown')}") print(f" ๐Ÿ” Content Hash (MD5): {result.get('content_hash', 'N/A')}") print(f" ๐Ÿ–ฅ๏ธ Server: {result.get('server', 'Unknown')}") print(f" ๐Ÿ”— Final URL: {result.get('final_url', 'N/A')}") print(f" ๐Ÿ• Scan Time: {result.get('timestamp', 'Unknown')}") # Fingerprinting - COMPLETE fingerprint = result.get('fingerprint', {}) if fingerprint: print(f"\n๐Ÿ–ฅ๏ธ ๐Ÿ–ฅ๏ธ ๐Ÿ–ฅ๏ธ COMPLETE PLATFORM FINGERPRINTING ๐Ÿ–ฅ๏ธ ๐Ÿ–ฅ๏ธ ๐Ÿ–ฅ๏ธ") tech_info = [ ('๐ŸŒ CDN Provider', 'cdn'), ('๐Ÿ›ก๏ธ WAF Protection', 'waf'), ('๐Ÿ’ป Programming Language', 'language'), ('๐Ÿ—๏ธ Web Framework', 'framework'), ('๐Ÿ–ฅ๏ธ Server Software', 'server_software'), ] for display_name, key in tech_info: if fingerprint.get(key): print(f" โ€ข {display_name}: {fingerprint[key]}") if fingerprint.get('technologies'): print(f"\n ๐Ÿ› ๏ธ ALL DETECTED TECHNOLOGIES:") for tech in fingerprint['technologies']: print(f" โœ“ {tech}") if fingerprint.get('content_indicators'): print(f"\n ๐Ÿ” CONTENT INDICATORS:") for indicator in fingerprint['content_indicators'][:10]: print(f" โ€ข {indicator}") print(f"\n ๐Ÿ“Š FINGERPRINT RISK SCORE: {fingerprint.get('risk_score', 0):.2f}/1.0") if fingerprint.get('cloudbleed_risk', 0) > 0: print(f" ๐Ÿšจ CLOUDBLEED RISK SCORE: {fingerprint.get('cloudbleed_risk', 0):.2f}/1.0") # Headers Analysis - COMPLETE headers_data = result.get('findings', {}).get('headers', {}) if headers_data: print(f"\n๐Ÿ“‹ ๐Ÿ“‹ ๐Ÿ“‹ COMPLETE HEADERS ANALYSIS ๐Ÿ“‹ ๐Ÿ“‹ ๐Ÿ“‹") if headers_data.get('missing_headers'): print(f"\n โŒ MISSING CRITICAL SECURITY HEADERS:") for idx, header in enumerate(headers_data['missing_headers'], 1): print(f" {idx:2d}. {header}") if headers_data.get('vulnerabilities'): print(f"\n โš ๏ธ HEADER VULNERABILITIES:") for idx, vuln in enumerate(headers_data['vulnerabilities'][:10], 1): print(f" {idx:2d}. {vuln}") # Security Analysis - COMPLETE security = result.get('findings', {}).get('security', {}) if security: print(f"\n๐Ÿ”’ ๐Ÿ”’ ๐Ÿ”’ COMPLETE SECURITY ANALYSIS ๐Ÿ”’ ๐Ÿ”’ ๐Ÿ”’") print(f" ๐ŸŽฏ OVERALL RISK LEVEL: {security.get('risk_level', 'low').upper()}") print(f" ๐Ÿ“ˆ RISK SCORE: {security.get('risk_score', 0):.2f}/1.0") if security.get('issues'): print(f"\n โš ๏ธ โš ๏ธ โš ๏ธ SECURITY ISSUES FOUND:") for idx, issue in enumerate(security.get('issues', []), 1): print(f" {idx:2d}. {issue}") # Memory Leak Patterns - COMPLETE display if security.get('memory_patterns'): memory_patterns = security['memory_patterns'] print(f"\n ๐Ÿšจ ๐Ÿšจ ๐Ÿšจ MEMORY LEAK PATTERNS DETECTED ๐Ÿšจ ๐Ÿšจ ๐Ÿšจ") print(f" ๐Ÿ“Š TOTAL PATTERNS: {len(memory_patterns)}") if security.get('pattern_statistics'): stats = security['pattern_statistics'] print(f"\n ๐Ÿ“ˆ PATTERN STATISTICS:") print(f" โ€ข Hex Strings: {stats.get('hex_strings', 0)}") print(f" โ€ข Null Sequences: {stats.get('null_sequences', 0)}") print(f" โ€ข Non-Printable: {stats.get('non_printable', 0)}") print(f" โ€ข UUIDs: {stats.get('uuids', 0)}") print(f" โ€ข Memory Addresses: {stats.get('memory_addresses', 0)}") print(f" โ€ข Total Patterns: {stats.get('total_patterns', 0)}") # Show first 5 patterns completely print(f"\n ๐Ÿ” FIRST 5 PATTERNS (COMPLETE):") for idx, pattern_info in enumerate(memory_patterns[:5], 1): if isinstance(pattern_info, dict): pattern = pattern_info.get('pattern', '') length = pattern_info.get('length', 0) pattern_type = pattern_info.get('type', 'unknown') print(f"\n {idx}. TYPE: {pattern_type}, LENGTH: {length} chars") print(f" {'โ”€'*60}") # Display COMPLETE pattern if length > 500: print(f" FIRST 500 CHARACTERS:") print(f" {pattern[:500]}...") print(f" ... [continued in full report] ...") else: print(f" {pattern}") print(f" {'โ”€'*60}") else: print(f"\n {idx}. {str(pattern_info)}") if len(memory_patterns) > 5: print(f"\n ... and {len(memory_patterns) - 5} more patterns") print(f" ๐Ÿ“„ See complete report for ALL patterns") if security.get('recommendations'): print(f"\n ๐Ÿ’ก ๐Ÿ’ก ๐Ÿ’ก SECURITY RECOMMENDATIONS:") for idx, rec in enumerate(security.get('recommendations', []), 1): print(f" {idx:2d}. {rec}") # Sensitive Data - COMPLETE sensitive_data = result.get('findings', {}).get('sensitive_data', {}) if sensitive_data: print(f"\n๐Ÿšจ ๐Ÿšจ ๐Ÿšจ SENSITIVE DATA DETECTED ๐Ÿšจ ๐Ÿšจ ๐Ÿšจ") total_items = sum(len(items) for items in sensitive_data.values()) print(f" ๐Ÿ“Š TOTAL SENSITIVE ITEMS FOUND: {total_items}") for category, items in sensitive_data.items(): if items: print(f"\n ๐Ÿ“ {category.upper()}: {len(items)} items") # Show first 3 items completely for idx, item in enumerate(items[:3], 1): if isinstance(item, dict): value = item.get('value', 'N/A') confidence = item.get('confidence', 0) length = item.get('length', len(value)) print(f"\n {idx}. CONFIDENCE: {confidence:.0%}, LENGTH: {length} chars") print(f" {'โ”€'*60}") # Display COMPLETE value if length > 300: print(f" FIRST 300 CHARACTERS:") print(f" {value[:300]}...") print(f" ... [full value in report] ...") else: print(f" {value}") print(f" {'โ”€'*60}") else: print(f"\n {idx}. {str(item)[:200]}..." if len(str(item)) > 200 else f" {idx}. {str(item)}") if len(items) > 3: print(f"\n ... and {len(items) - 3} more {category}") # Cloudflare Detection - COMPLETE cloudflare = result.get('findings', {}).get('cloudflare', {}) if cloudflare: print(f"\n๐Ÿ›ก๏ธ ๐Ÿ›ก๏ธ ๐Ÿ›ก๏ธ CLOUDFLARE DETECTION ๐Ÿ›ก๏ธ ๐Ÿ›ก๏ธ ๐Ÿ›ก๏ธ") print(f" ๐Ÿ” DETECTED: {'YES' if cloudflare.get('detected') else 'NO'}") print(f" ๐Ÿ“Š CONFIDENCE: {cloudflare.get('confidence', 0):.0%}") if cloudflare.get('detected') and cloudflare.get('indicators'): print(f"\n ๐Ÿ“‹ INDICATORS FOUND: {cloudflare.get('indicator_count', 0)}") indicators = cloudflare.get('indicators', []) for idx, indicator in enumerate(indicators[:5], 1): if isinstance(indicator, dict): print(f" {idx}. {indicator.get('type', 'unknown')}: {indicator.get('pattern', 'unknown')}") else: print(f" {idx}. {indicator}") # Intelligence Data - COMPLETE intelligence = result.get('intelligence', {}) if intelligence: print(f"\n๐Ÿง  ๐Ÿง  ๐Ÿง  THREAT INTELLIGENCE ๐Ÿง  ๐Ÿง  ๐Ÿง ") print(f" ๐Ÿ“Š IOC SCORE: {intelligence.get('ioc_score', 0):.2f}/1.0") print(f" ๐ŸŽฏ THREAT LEVEL: {intelligence.get('threat_level', 'low').upper()}") ioc_classification = intelligence.get('ioc_classification', {}) for level, items in ioc_classification.items(): if items: print(f"\n ๐Ÿ“ {level.upper()} IOCS ({len(items)}):") for idx, item in enumerate(items[:5], 1): print(f" {idx}. {item[:100]}..." if len(item) > 100 else f" {idx}. {item}") print("\n" + "="*120) # Save COMPLETE report try: saved_file = self.report_saver.save_complete_report(result) print(f"\n๐Ÿ’พ ๐Ÿ’พ ๐Ÿ’พ COMPLETE CLOUDBLEED REPORT SAVED TO: {saved_file}") print(f"๐Ÿ“„ File contains ALL data with NO truncation") # Show file statistics if os.path.exists(saved_file): file_size = os.path.getsize(saved_file) print(f"๐Ÿ“ Report size: {file_size:,} bytes ({file_size/1024:.1f} KB)") with open(saved_file, 'r', encoding='utf-8') as f: lines = f.readlines() print(f"๐Ÿ“ Total lines: {len(lines):,}") except Exception as e: print(f"\nโš ๏ธ Could not save complete report: {e}") async def scan_multiple_complete(self, urls): """Scan multiple URLs with COMPLETE analysis""" print(f"\n๐Ÿš€ ๐Ÿš€ ๐Ÿš€ Starting COMPLETE scan of {len(urls)} URLs...") print(f"โฐ Start time: {datetime.now().strftime('%H:%M:%S')}") results = [] for i, url in enumerate(urls, 1): print(f"\n{'='*80}") print(f"[{i}/{len(urls)}] ๐Ÿ” Scanning: {url}") print(f"{'='*80}") result = await self.scan_url(url) results.append(result) self.display_result_complete(result) # Delay between requests if i < len(urls): delay = 2 if i % 5 == 0 else 1 print(f"\nโณ Waiting {delay} second before next scan...") await asyncio.sleep(delay) # Generate COMPLETE report self.generate_complete_report(results) return results def generate_complete_report(self, results, filename="cloudbleed_complete_master_report.json"): """Generate COMPLETE master report""" print(f"\n๐Ÿ“Š ๐Ÿ“Š ๐Ÿ“Š GENERATING COMPLETE MASTER REPORT ๐Ÿ“Š ๐Ÿ“Š ๐Ÿ“Š") report = { 'scan_date': datetime.now().isoformat(), 'scan_version': '4.0-COMPLETE', 'total_scans': len(results), 'successful_scans': len([r for r in results if r.get('success', False)]), 'failed_scans': len([r for r in results if not r.get('success', False)]), 'results': results } # COMPLETE Statistics stats = { 'cloudflare_sites': 0, 'sensitive_data_sites': 0, 'memory_leak_sites': 0, 'critical_risk_sites': 0, 'high_risk_sites': 0, 'medium_risk_sites': 0, 'low_risk_sites': 0, 'total_memory_patterns': 0, 'total_sensitive_items': 0, 'sites_with_cloudbleed_risk': 0 } for result in results: if result.get('success'): findings = result.get('findings', {}) if findings.get('cloudflare', {}).get('detected'): stats['cloudflare_sites'] += 1 if findings.get('sensitive_data'): sensitive_count = sum(len(items) for items in findings['sensitive_data'].values()) stats['total_sensitive_items'] += sensitive_count stats['sensitive_data_sites'] += 1 security = findings.get('security', {}) if security.get('memory_patterns'): pattern_count = len(security['memory_patterns']) stats['total_memory_patterns'] += pattern_count stats['memory_leak_sites'] += 1 # Risk level classification risk_level = security.get('risk_level', 'low') if risk_level == 'critical': stats['critical_risk_sites'] += 1 elif risk_level == 'high': stats['high_risk_sites'] += 1 elif risk_level == 'medium': stats['medium_risk_sites'] += 1 else: stats['low_risk_sites'] += 1 # Cloudbleed-specific risk fingerprint = result.get('fingerprint', {}) if fingerprint.get('cdn') == 'Cloudflare' and (findings.get('sensitive_data') or security.get('memory_patterns')): stats['sites_with_cloudbleed_risk'] += 1 report['statistics'] = stats # Save COMPLETE report with open(filename, 'w', encoding='utf-8', errors='replace') as f: json.dump(report, f, indent=2, ensure_ascii=False, default=str) print(f"\n๐Ÿ’พ ๐Ÿ’พ ๐Ÿ’พ COMPLETE MASTER REPORT SAVED TO: {filename}") # Display COMPLETE statistics print(f"\n๐Ÿ“Š ๐Ÿ“Š ๐Ÿ“Š CLOUDBLEED SCAN STATISTICS ๐Ÿ“Š ๐Ÿ“Š ๐Ÿ“Š") print(f"{'='*80}") print(f"Total URLs Scanned: {stats['cloudflare_sites'] + stats['sensitive_data_sites'] + stats['memory_leak_sites'] + stats['critical_risk_sites'] + stats['high_risk_sites'] + stats['medium_risk_sites'] + stats['low_risk_sites']}") print(f"Cloudflare Sites: {stats['cloudflare_sites']}") print(f"Sites with Sensitive Data: {stats['sensitive_data_sites']} ({stats['total_sensitive_items']} items)") print(f"Sites with Memory Leak Patterns: {stats['memory_leak_sites']} ({stats['total_memory_patterns']} patterns)") print(f"Sites with Cloudbleed Risk: {stats['sites_with_cloudbleed_risk']}") print(f"\nRisk Distribution:") print(f" โ€ข Critical Risk: {stats['critical_risk_sites']}") print(f" โ€ข High Risk: {stats['high_risk_sites']}") print(f" โ€ข Medium Risk: {stats['medium_risk_sites']}") print(f" โ€ข Low Risk: {stats['low_risk_sites']}") print(f"{'='*80}") return report async def main_complete(): """Main function for COMPLETE scanner""" print(""" โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•— โ•‘ CLOUDBLEED SCANNER v4.0 - COMPLETE EDITION โ•‘ โ•‘ Cloudflare Memory Leak Detection - SHOWS ALL DATA โ•‘ โ•‘ NO TRUNCATION - COMPLETE INFORMATION DISPLAY โ•‘ โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ• """) print("โš ๏ธ โš ๏ธ โš ๏ธ WARNING: Use only for authorized security testing!") print(" Unauthorized scanning is illegal in most countries.\n") print("๐Ÿ” This version shows ALL data with NO truncation") print("๐Ÿ“„ Complete reports are saved for full analysis\n") scanner = CompleteCloudbleedScanner( enable_cache=True, enable_intelligence=True ) while True: try: print("\n" + "="*70) print("๐Ÿ“‹ ๐Ÿ“‹ ๐Ÿ“‹ COMPLETE SCANNER OPTIONS ๐Ÿ“‹ ๐Ÿ“‹ ๐Ÿ“‹") print("="*70) print(" 1. ๐Ÿ” Scan single URL (COMPLETE analysis)") print(" 2. ๐Ÿ“ Scan multiple URLs from file") print(" 3. ๐Ÿงช Test scan with predefined URLs") print(" 4. ๐Ÿ—‘๏ธ Clear cache") print(" 5. ๐Ÿ“Š Show statistics") print(" 6. ๐Ÿšช Exit") print("="*70) choice = input("\nEnter choice (1-6): ").strip() if choice == '1': url = input("\n๐ŸŒ Enter URL to scan (COMPLETE analysis): ").strip() if not url: print("โŒ URL cannot be empty!") continue if not url.startswith(('http://', 'https://')): url = 'https://' + url print(f"โ„น๏ธ Added https:// automatically: {url}") print(f"\n๐Ÿ” Starting COMPLETE scan of: {url}") result = await scanner.scan_url(url) scanner.display_result_complete(result) elif choice == '2': filename = input("\n๐Ÿ“ Enter filename with URLs (one per line): ").strip() try: with open(filename, 'r', encoding='utf-8') as f: urls = [line.strip() for line in f if line.strip()] if not urls: print("โŒ File is empty or contains no URLs!") continue print(f"๐Ÿ“Š Found {len(urls)} URLs in file") print(f"๐Ÿ“ Sample URLs:") for url in urls[:3]: print(f" โ€ข {url}") if len(urls) > 3: print(f" ... and {len(urls) - 3} more") confirm = input("\nโš ๏ธ โš ๏ธ โš ๏ธ Start COMPLETE scanning of ALL URLs? (yes/no): ").strip().lower() if confirm in ['yes', 'y', '']: print(f"\n๐Ÿš€ Starting COMPLETE scan of {len(urls)} URLs...") await scanner.scan_multiple_complete(urls) else: print("โŒ Scan cancelled") except FileNotFoundError: print(f"โŒ File {filename} not found!") except Exception as e: print(f"โŒ Error reading file: {e}") elif choice == '3': test_urls = [ 'https://httpbin.org/headers', 'https://httpbin.org/html', 'https://example.com', 'https://httpbin.org/status/200', 'https://httpbin.org/json' ] print(f"\n๐Ÿงช Testing with {len(test_urls)} predefined URLs...") print("โ„น๏ธ These are public test URLs for demonstration") confirm = input("\nStart test scan? (yes/no): ").strip().lower() if confirm in ['yes', 'y', '']: for url in test_urls: result = await scanner.scan_url(url) scanner.display_result_complete(result) await asyncio.sleep(1) else: print("โŒ Test cancelled") elif choice == '4': if os.path.exists(".cache"): import shutil shutil.rmtree(".cache") print("โœ… Cache cleared successfully") else: print("โ„น๏ธ No cache directory found") elif choice == '5': print("\n๐Ÿ“Š ๐Ÿ“Š ๐Ÿ“Š SCANNER STATISTICS ๐Ÿ“Š ๐Ÿ“Š ๐Ÿ“Š") print("="*60) if os.path.exists(".cache"): cache_size = sum(f.stat().st_size for f in Path(".cache").rglob('*') if f.is_file()) print(f"Cache size: {cache_size:,} bytes ({cache_size/1024/1024:.2f} MB)") else: print("Cache: Not initialized") print("="*60) elif choice == '6': print("\n๐Ÿ‘‹ ๐Ÿ‘‹ ๐Ÿ‘‹ Goodbye! ๐Ÿ‘‹ ๐Ÿ‘‹ ๐Ÿ‘‹") break else: print(f"โŒ Invalid choice: {choice}") except KeyboardInterrupt: print("\n\nโš ๏ธ Scan interrupted by user") break except Exception as e: print(f"\nโŒ Unexpected error: {e}") import traceback traceback.print_exc() if __name__ == "__main__": # Windows compatibility if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) try: asyncio.run(main_complete()) except KeyboardInterrupt: print("\n\n๐Ÿ‘‹ Exiting...") sys.exit(0) except Exception as e: print(f"\n๐Ÿ’ฅ Critical error: {e}") import traceback traceback.print_exc() sys.exit(1) Greetings to :===================================================================================== jericho * Larry W. Cashdollar * LiquidWorm * Hussin-X * D4NB4R * Malvuln (John Page aka hyp3rlinx)| ===================================================================================================