============================================================================================================================================= | # Title : Vertex AI Experiments 1.21.0 to 1.132.x Predictable Bucket Naming Leading to Cross‑Tenant RCE and Model Theft | | # Author : indoushka | | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.4 (64 bits) | | # Vendor : https://cloud.google.com/vertex-ai | ============================================================================================================================================= [+] Summary : A vulnerability identified as CVE-2026-2473 affected Google Cloud Vertex AI, specifically the Vertex AI Experiments component, in versions 1.21.0 through 1.132.x (fixed in 1.133.0 and later). The issue stemmed from predictable Cloud Storage bucket naming patterns, enabling a class of attack known as Bucket Squatting. Because bucket names in Google Cloud Storage are globally unique, an attacker could pre-create a bucket matching a predictable name before a legitimate tenant did. This allowed a potential unauthenticated remote attacker to: Achieve cross-tenant Remote Code Execution (RCE) Exfiltrate trained models (Model Theft) Poison experiment artifacts or training outputs Interfere with experiment tracking pipelines [+] POC : #!/usr/bin/env python3 import google.cloud.storage as storage import time import sys import os import json import argparse from datetime import datetime from typing import List, Dict import random import string class VertexAIBucketDefensiveScanner: """ Scanner for detecting Bucket Squatting risk in Google Cloud Storage used by Vertex AI Experiments by indoushka. """ def __init__(self, project_id: str, credentials_path: str): self.project_id = project_id self.findings = [] try: self.client = storage.Client.from_service_account_json(credentials_path) print(f"[✓] Connected to GCS project: {project_id}") except Exception as e: print(f"[✗] Connection failed: {e}") sys.exit(1) self.bucket_patterns = [ "vertex-ai-experiment-{exp_id}-{timestamp}", "vertex-ai-exp-{exp_id}-data-{timestamp}", "vertex-{exp_id}-experiment-{random}", "ai-platform-{exp_id}-{timestamp}", "vertex-ai-{exp_id}-{user_id}", "experiment-{exp_id}-bucket-{timestamp}", "vertex-{random}-{exp_id}-data" ] def generate_bucket_names(self, exp_id: str, count: int = 5) -> List[str]: predicted = [] timestamp = int(time.time()) for pattern in self.bucket_patterns: for i in range(count): random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) user_id = f"user-{random.randint(1000, 9999)}" name = pattern.format( exp_id=exp_id, timestamp=timestamp + i, random=random_str, user_id=user_id ) predicted.append(name) return predicted def check_bucket_status(self, bucket_name: str) -> Dict: """ Checks if bucket exists and whether accessible WITHOUT creating it. """ bucket = self.client.bucket(bucket_name) try: exists = bucket.exists() if not exists: return { "bucket": bucket_name, "status": "AVAILABLE", "risk": "HIGH", "reason": "Predictable name and not registered." } try: bucket.reload() owner = bucket.project_number if owner != self.client.project: return { "bucket": bucket_name, "status": "EXISTS_EXTERNAL", "risk": "CRITICAL", "reason": "Bucket exists but not owned by this project." } return { "bucket": bucket_name, "status": "OWNED", "risk": "LOW", "reason": "Bucket exists and owned by project." } except Exception: return { "bucket": bucket_name, "status": "EXISTS_NO_ACCESS", "risk": "MEDIUM", "reason": "Bucket exists but metadata not accessible." } except Exception as e: return { "bucket": bucket_name, "status": "ERROR", "risk": "UNKNOWN", "reason": str(e) } def scan_experiment_range(self, start_id: int, end_id: int): print(f"[*] Scanning experiment IDs {start_id} → {end_id}") for exp_id in range(start_id, end_id + 1): bucket_names = self.generate_bucket_names(str(exp_id), count=3) for name in bucket_names: result = self.check_bucket_status(name) self.findings.append(result) print(f"[{result['risk']}] {name} → {result['status']}") time.sleep(0.2) def generate_report(self) -> Dict: summary = { "CRITICAL": len([f for f in self.findings if f["risk"] == "CRITICAL"]), "HIGH": len([f for f in self.findings if f["risk"] == "HIGH"]), "MEDIUM": len([f for f in self.findings if f["risk"] == "MEDIUM"]), "LOW": len([f for f in self.findings if f["risk"] == "LOW"]), } return { "timestamp": datetime.now().isoformat(), "project_id": self.project_id, "summary": summary, "total_checked": len(self.findings), "findings": self.findings } def main(): parser = argparse.ArgumentParser( description="Defensive Scanner - Vertex AI Bucket Squatting Risk" ) parser.add_argument('--project', required=True) parser.add_argument('--creds', required=True) parser.add_argument('--start', type=int, default=1000) parser.add_argument('--end', type=int, default=1010) args = parser.parse_args() scanner = VertexAIBucketDefensiveScanner( project_id=args.project, credentials_path=args.creds ) print("=" * 60) print("Vertex AI Bucket Squatting Defensive Scanner") print("Mode: Detection Only (No Exploitation)") print("=" * 60) scanner.scan_experiment_range(args.start, args.end) report = scanner.generate_report() with open("defensive_scan_report.json", "w") as f: json.dump(report, f, indent=2) print("\n[✓] Report saved to defensive_scan_report.json") if __name__ == "__main__": exit(main()) Greetings to :============================================================================== jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)| ============================================================================================