Source code for beagle.transformers.sysmon_transformer
from typing import Dict, List, Optional, Tuple, Union
from beagle.common import logger, split_path
from beagle.constants import Protocols
from beagle.nodes import URI, Alert, Domain, File, IPAddress, Node, Process, RegistryKey
from beagle.transformers.base_transformer import Transformer
[docs]class SysMonProc(Process):
"""A custom Process class which extends the regular one. Adds
the unique Sysmon process_guid identifier.
"""
key_fields: List[str] = ["process_guid"]
process_guid: Optional[str]
def __init__(self, process_guid: str = None, *args, **kwargs) -> None:
self.process_guid = process_guid
super().__init__(*args, **kwargs)
[docs]class SysmonTransformer(Transformer):
name = "Sysmon"
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
logger.info("Created Sysmon Transformer.")
[docs] def transform(self, event: dict) -> Optional[Tuple]:
# Get the sysmon event ID
event_id = int(event["EventID"])
if event_id == 1:
return self.process_creation(event)
elif event_id == 3:
return self.network_connection(event)
elif event_id in [13, 14, 15]:
return self.registry_creation(event)
return None
[docs] def process_creation(self, event: dict) -> Tuple[Process, File, Process, File]:
# Make the parent process
parent_image, parent_path = split_path(event["EventData_ParentImage"])
parent = SysMonProc(
host=event["Computer"],
process_id=int(event["EventData_ParentProcessId"]),
process_guid=event["EventData_ParentProcessGuid"],
process_image=parent_image,
process_image_path=parent_path,
)
parent_file = parent.get_file_node()
parent_file.file_of[parent]
process_image, process_path = split_path(event["EventData_Image"])
proc = SysMonProc(
host=event["Computer"],
user=event["EventData_User"],
process_guid=event["EventData_ProcessGuid"],
process_id=int(event["EventData_ProcessId"]),
process_image=process_image,
process_image_path=process_path,
command_line=event["EventData_CommandLine"],
hashes={
val.split("=")[0].lower(): val.split("=")[1]
for val in event["EventData_Hashes"].split(",")
},
)
proc_file = proc.get_file_node()
proc_file.file_of[proc]
parent.launched[proc].append(timestamp=event["EventData_UtcTime"])
return (parent, parent_file, proc, proc_file)
[docs] def network_connection(
self, event: dict
) -> Union[Tuple[Process, File, IPAddress], Tuple[Process, File, IPAddress, Domain]]:
process_image, process_path = split_path(event["EventData_Image"])
proc = SysMonProc(
host=event["Computer"],
user=event["EventData_User"],
process_guid=event["EventData_ProcessGuid"],
process_id=int(event["EventData_ProcessId"]),
process_image=process_image,
process_image_path=process_path,
)
proc_file = proc.get_file_node()
proc_file.file_of[proc]
dest_addr = IPAddress(ip_address=event["EventData_DestinationIp"])
proc.connected_to[dest_addr].append(
timestamp=event["EventData_UtcTime"],
port=event["EventData_DestinationPort"],
protocol=event["EventData_Protocol"],
)
if event.get("EventData_DestinationHostname"):
hostname = Domain(event["EventData_DestinationHostname"])
hostname.resolves_to[dest_addr].append(timestamp=event["EventData_UtcTime"])
return (proc, proc_file, dest_addr, hostname)
return (proc, proc_file, dest_addr)
[docs] def file_created(self, event: dict) -> Tuple[Process, File, File]:
process_image, process_path = split_path(event["EventData_Image"])
proc = SysMonProc(
host=event["Computer"],
user=event["EventData_User"],
process_guid=event["EventData_ProcessGuid"],
process_id=int(event["EventData_ProcessId"]),
process_image=process_image,
process_image_path=process_path,
)
proc_file = proc.get_file_node()
proc_file.file_of[proc]
file_image, file_path = split_path(event["EventData_TargetFilename"])
file_node = File(file_name=file_image, file_path=file_path)
proc.accessed[file_node].append(timestamp=event["EventData_UtcTime"])
return (proc, proc_file, file_node)
[docs] def registry_creation(self, event: dict) -> Optional[Tuple[Process, File, RegistryKey]]:
if "EventData_TargetObject" not in event:
return None
process_image, process_path = split_path(event["EventData_Image"])
proc = SysMonProc(
host=event["Computer"],
user=event.get("EventData_User"),
process_guid=event["EventData_ProcessGuid"],
process_id=int(event["EventData_ProcessId"]),
process_image=process_image,
process_image_path=process_path,
)
proc_file = proc.get_file_node()
proc_file.file_of[proc]
key_path = event["EventData_TargetObject"]
hive = key_path.split("\\")[1]
key = key_path.split("\\")[-1]
# Always has a leading \\ so split from 2:
key_path = "\\".join(key_path.split("\\")[2:-1])
key = RegistryKey(
hive=hive,
key=key,
key_path=key_path,
value=event.get("EventData_Details"),
value_type="DWORD",
)
event_type = event["EventData_EventType"]
if event_type == "SetValue":
proc.changed_value[key].append(
value=event.get("EventData_Details"), timestamp=event["EventData_UtcTime"]
)
elif event_type == "DeleteValue":
proc.deleted_value[key].append(timestamp=event["EventData_UtcTime"])
elif event_type == "CreateKey":
proc.created_key[key].append(timestamp=event["EventData_UtcTime"])
elif event_type == "DeleteKey":
proc.deleted_key[key].append(timestamp=event["EventData_UtcTime"])
return (proc, proc_file, key)