Source code for beagle.transformers.darpa_tc_transformer
from typing import List, Optional, Tuple, Union
from beagle.common import logger, split_path, split_reg_path
from beagle.nodes import File, Process, RegistryKey, IPAddress
from beagle.transformers.base_transformer import Transformer
# Custom Node classes to use the UUID in TC
class TCProcess(Process):
key_fields: List[str] = ["uuid"]
uuid: Optional[str]
def __init__(self, uuid: str = None, *args, **kwargs) -> None:
self.uuid = uuid
super().__init__(*args, **kwargs)
class TCFile(File):
key_fields: List[str] = ["uuid"]
uuid: Optional[str]
def __init__(self, uuid: str = None, *args, **kwargs) -> None:
self.uuid = uuid
super().__init__(*args, **kwargs)
class TCRegistryKey(RegistryKey):
key_fields: List[str] = ["uuid"]
uuid: Optional[str]
def __init__(self, uuid: str = None, *args, **kwargs) -> None:
self.uuid = uuid
super().__init__(*args, **kwargs)
class TCIPAddress(IPAddress):
key_fields: List[str] = ["uuid"]
uuid: Optional[str]
def __init__(self, uuid: str = None, *args, **kwargs) -> None:
self.uuid = uuid
super().__init__(*args, **kwargs)
[docs]class DRAPATCTransformer(Transformer):
name = "DARPA TC"
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
logger.info("Created Darpa Transperant Computing Transformer.")
[docs] def transform(self, event: dict) -> Optional[Tuple]:
event_type = event["event_type"]
if event_type == "subject" and event["type"] == "SUBJECT_PROCESS":
return self.make_process(event)
elif event_type == "fileobject" and event["type"] in [
"FILE_OBJECT_BLOCK",
"FILE_OBJECT_PEFILE",
]:
return self.make_file(event)
elif event_type == "registrykeyobject":
return self.make_registrykey(event)
elif event_type == "netflowobject":
return self.make_addr(event)
elif event_type == "event" and event["type"] in [
"EVENT_READ",
"EVENT_OPEN",
"EVENT_WRITE",
"EVENT_WRITE_APPEND",
"EVENT_MODIFY_FILE_ATTRIBUTES",
"EVENT_CREATE_OBJECT",
"EVENT_LOAD_LIBRARY",
]:
return self.file_events(event)
elif event_type == "event" and event["type"] == "EVENT_EXECUTE":
return self.execute_events(event)
elif event_type == "event" and event["type"] in ["EVENT_CONNECT"]:
return self.conn_events(event)
return tuple()
[docs] def make_process(self, event: dict) -> Union[Tuple[TCProcess], Tuple[TCProcess, TCProcess]]:
if event.get("cmdLine"):
proc_cmdline = event["cmdLine"]["string"]
else:
proc_cmdline = None
path = None
image = None
if event.get("properties"):
path = event["properties"]["map"].get("path")
if "/" in path:
# Swap the path directions
path = path.replace("/", "\\")
image, path = split_path(path)
proc = TCProcess(
uuid=event["uuid"],
process_image=image or proc_cmdline,
process_image_path=path or proc_cmdline,
command_line=proc_cmdline,
host=event["hostId"],
)
if event.get("parentSubject"):
parent = TCProcess(
uuid=event["parentSubject"]["com.bbn.tc.schema.avro.cdm18.UUID"],
host=event["hostId"],
)
parent.launched[proc]
return (proc, parent)
else:
return (proc,)
[docs] def make_file(self, event: dict) -> Tuple[TCFile]:
base_obj = event["baseObject"]
file_node = TCFile(uuid=event["uuid"], host=base_obj["hostId"])
# Since not everything has a full path, and this is multiple different systems,
# this is the best try for this.
if base_obj.get("properties"):
full_path = base_obj["properties"]["map"].get("filename", "")
full_path = full_path.replace("/", "\\")
file_name, file_path = split_path(full_path)
file_node.full_path = full_path
file_node.file_path = file_path
file_node.file_name = file_name
return (file_node,)
[docs] def make_registrykey(self, event: dict) -> Tuple[TCRegistryKey]:
if event["key"].startswith("\\REGISTRY\\"):
event["key"] = event["key"].replace("\\REGISTRY\\", "", 1)
hive, key, path = split_reg_path(event["key"])
base_obj = event["baseObject"]
value = event["value"]["com.bbn.tc.schema.avro.cdm18.Value"]
regkey = TCRegistryKey(
uuid=event["uuid"],
host=base_obj["hostId"],
value_type=value["valueDataType"],
value=value["name"],
hive=hive,
key_path=path,
key=key,
)
return (regkey,)
[docs] def make_addr(self, event: dict) -> Tuple[TCIPAddress]:
addr = TCIPAddress(uuid=event["uuid"], ip_address=event["remoteAddress"])
# TODO: Add port data somehow
return (addr,)
[docs] def file_events(self, event: dict) -> Tuple[TCProcess, TCFile]:
proc = TCProcess(uuid=event["subject"]["com.bbn.tc.schema.avro.cdm18.UUID"])
target = TCFile(uuid=event["predicateObject"]["com.bbn.tc.schema.avro.cdm18.UUID"])
if event["type"] in ["EVENT_READ", "EVENT_MODIFY_FILE_ATTRIBUTES", "EVENT_OPEN"]:
proc.accessed[target].append(timestamp=event["timestampNanos"])
elif event["type"] in ["EVENT_WRITE", "EVENT_WRITE_APPEND", "EVENT_CREATE_OBJECT"]:
proc.wrote[target].append(timestamp=event["timestampNanos"])
elif event["type"] in ["EVENT_LOAD_LIBRARY"]:
proc.loaded[target].append(timestamp=event["timestampNanos"])
return (proc, target)
[docs] def execute_events(self, event: dict) -> Tuple[TCProcess, TCProcess]:
proc = TCProcess(uuid=event["subject"]["com.bbn.tc.schema.avro.cdm18.UUID"])
target = TCProcess(
uuid=event["predicateObject"]["com.bbn.tc.schema.avro.cdm18.UUID"],
process_image=event.get("predicateObjectPath", {}).get("string"),
)
proc.launched[target].append(timestamp=event["timestampNanos"])
return (proc, target)
[docs] def conn_events(self, event: dict) -> Tuple[TCProcess, TCIPAddress]:
proc = TCProcess(uuid=event["subject"]["com.bbn.tc.schema.avro.cdm18.UUID"])
addr = TCIPAddress(uuid=event["predicateObject"]["com.bbn.tc.schema.avro.cdm18.UUID"])
# TODO: Need to add the port data on the edge somehow
proc.connected_to[addr].append(timestamp=event["timestampNanos"])
return (proc, addr)