Source code for beagle.transformers.procmon_transformer
import re
from typing import Optional, Tuple
from beagle.common import split_path
from beagle.nodes import File, Process, RegistryKey
from beagle.nodes.ip_address import IPAddress
from beagle.transformers.base_transformer import Transformer
[docs]class ProcmonTransformer(Transformer):
name = "Procmon"
[docs] def transform(self, event: dict) -> Optional[Tuple]:
operation = event["event_type"]
if operation == "Process Create":
return self.process_create(event)
elif operation in ["WriteFile", "CreateFile"]:
return self.write_file(event)
elif operation in ["CloseFile", "ReadFile"]:
return self.access_file(event)
elif operation in ["RegOpenKey", "RegQueryKey", "RegQueryValue", "RegCloseKey"]:
return self.access_reg_key(event)
elif operation in ["TCP Send", "TCP Receive", "TCP Connect", "UDP Connect", "UDP Receive"]:
return self.connection(event)
return None
[docs] def process_create(self, event) -> Tuple[Process, File, Process]:
pid = -1
command_line = None
match = re.match(r"PID: (\d*), Command line: (.*)", event["params"])
if match:
pid, command_line = match.groups()
process_image, process_image_path = split_path(event["path"])
proc = Process(
process_id=int(pid),
process_image=process_image,
process_image_path=process_image_path,
command_line=command_line,
)
proc_file = proc.get_file_node()
proc_file.file_of[proc]
parent = Process(process_id=int(event["process_id"]), process_image=event["process_name"])
parent.launched[proc].append(timestamp=event["event_time"])
return (proc, proc_file, parent)
[docs] def write_file(self, event) -> Tuple[Process, File]:
proc = Process(process_id=int(event["process_id"]), process_image=event["process_name"])
file_name, file_path = split_path(event["path"])
target_file = File(file_name=file_name, file_path=file_path)
proc.wrote[target_file].append(timestamp=event["event_time"])
return (proc, target_file)
[docs] def access_file(self, event) -> Tuple[Process, File]:
proc = Process(process_id=int(event["process_id"]), process_image=event["process_name"])
file_name, file_path = split_path(event["path"])
target_file = File(file_name=file_name, file_path=file_path)
proc.accessed[target_file].append(timestamp=event["event_time"])
return (proc, target_file)
[docs] def access_reg_key(self, event) -> Tuple[Process, RegistryKey]:
proc = Process(process_id=int(event["process_id"]), process_image=event["process_name"])
reg_key, reg_path = split_path(event["path"])
hive = reg_path.split("\\")[0]
reg_path = "\\".join(reg_path.split("\\")[1:])
reg_node = RegistryKey(hive=hive, key_path=reg_path, key=reg_key)
proc.read_key[reg_node].append(timestamp=event["event_time"])
return (proc, reg_node)
[docs] def connection(self, event) -> Tuple[Process, IPAddress]:
proc = Process(process_id=int(event["process_id"]), process_image=event["process_name"])
dest_addr = event["path"].split("->")[-1].lstrip()
colons = dest_addr.split(":")
if len(colons) > 2:
ip_addr = ":".join(colons[:-1])
port = colons[-1]
else:
ip_addr, port = colons
addr = IPAddress(ip_addr)
proc.connected_to[addr].append(
timestamp=event["event_time"],
port=int(port),
protocol=event["event_type"].split(" ")[0], # Extract protocol from event type
)
return (proc, addr)