Source code for beagle.transformers.fireeye_hx_transformer
from typing import Dict, Optional, Tuple, Union
from beagle.common import logger, split_path
from beagle.constants import Protocols
from beagle.nodes import URI, Domain, File, IPAddress, Node, Process, RegistryKey, Alert
from beagle.transformers.base_transformer import Transformer
[docs]class FireEyeHXTransformer(Transformer):
name = "FireEye HX"
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
logger.info("Created FireEyeHX Transformer.")
[docs] def transform(self, event: dict) -> Optional[Tuple[Node, ...]]:
"""Sends each event from the FireEye HX Triage to the appropriate
node creation function.
Parameters
----------
event : dict
The source event from the HX Triage
Returns
-------
Optional[Tuple[Node, ...]]
The results of the transforming function
"""
# NOTE: Manually created event_type in HXTriage
# Check happens before the process path because
# we don't expect these events to have that.
if event["event_type"] == "alertEvent":
return self.make_alert(event)
# If there's no value in processPath, we can't create the process node properly.
if "processPath" in event and not event["processPath"]:
return None
# Event types from the agent events.
if event["event_type"] == "processEvent":
return self.make_process(event)
elif event["event_type"] == "fileWriteEvent":
return self.make_file(event)
elif event["event_type"] == "urlMonitorEvent":
return self.make_url(event)
elif event["event_type"] == "ipv4NetworkEvent":
return self.make_network(event)
elif event["event_type"] == "dnsLookupEvent":
return self.make_dnslookup(event)
elif event["event_type"] == "imageLoadEvent":
return self.make_imageload(event)
elif event["event_type"] == "regKeyEvent":
return self.make_registry(event)
else:
return None
[docs] def make_process(
self, event: dict
) -> Optional[Union[Tuple[Process, File], Tuple[Process, File, Process, File]]]:
"""Converts a processEvent into either one Process node, or two Process nodes with a
parent - (Launched) -> child relationship.
Additionally, creats File nodes for the images of both of the Processe's identified.
Parameters
----------
event : dict
The processEvent event
Returns
-------
Optional[Union[Tuple[Process, File], Tuple[Process, File, Process, File]]]
Returns either a single process node, or a (parent, child) tuple
where the parent has a launched edge to the child.
"""
# Only look at start or already running event types.
if event["eventType"] not in ["start", "running"]:
return None
# Sometimes a process in a `running` state doens't properly identify the path
# which causes a case where processPath == process.
# For example {"process": "cmd.exe", "processPath": "cmd.exe"}
# See: test_process_equal_processpath
if event["processPath"] == event["process"]:
return None
process_image, process_image_path = split_path(event["processPath"])
# Pull out the hashes
hashes: Dict[str, str] = {}
if event.get("md5"):
hashes = {"md5": event["md5"]}
# Create the "child" process. This is the process who this event
# belongs to.
child = Process(
process_image=process_image,
process_image_path=process_image_path,
command_line=event.get("processCmdLine"),
process_id=int(event["pid"]),
hashes=hashes,
user=event.get("username", None),
)
# Pull out the image of the child process
child_proc_file_node = child.get_file_node()
# File - (File Of) -> Process
child_proc_file_node.file_of[child]
# If there is no parent field, return just the child
if "parentProcessPath" not in event:
return (child, child_proc_file_node)
parent_process_image, parent_process_image_path = split_path(event["parentProcessPath"])
# Create the parent process
parent = Process(
process_id=int(event["parentPid"]),
process_image=parent_process_image,
process_image_path=parent_process_image_path,
)
# Create a parent - (launched) -> child edge.
parent.launched[child].append(timestamp=event["event_time"])
# Pull out the image of the parent process
parent_proc_file_node = child.get_file_node()
# File - (File Of) -> Process
parent_proc_file_node.file_of[child]
return (parent, parent_proc_file_node, child, child_proc_file_node)
[docs] def make_file(self, event: dict) -> Optional[Tuple[File, Process, File]]:
"""Converts a fileWriteEvent to two nodes, a file and the process manipulated the file.
Generates a process - (Wrote) -> File edge.
Parameters
----------
event : dict
The fileWriteEvent event.
Returns
-------
Optional[Tuple[File, Process, File]]
Returns a tuple contaning the File that this event is focused on, and the process
which manipulated the file. The process has a Wrote edge to the file. Also contains
the file that the process belongs to.
"""
if "filePath" not in event:
return None
# Add the drive where possible.
if event.get("drive"):
file_path = f"{event['drive']}:\\{event['filePath']}"
else:
file_path = event["filePath"]
hashes: Dict[str, str] = {}
if event.get("md5"):
hashes = {"md5": event["md5"]}
# Create the file node.
file_node = File(file_path=file_path, file_name=event["fileName"], hashes=hashes)
# Set the extension
file_node.set_extension()
# Set the process node
process = Process(
process_id=int(event["pid"]),
process_image=event["process"],
process_image_path=event["processPath"],
user=event.get("username"),
)
# Add a wrote edge with the contents of the file write.
process.wrote[file_node].append(
timestamp=int(event["event_time"]), contents=event.get("textAtLowestOffset")
)
# Pull out the image of the process
proc_file_node = process.get_file_node()
# File - (File Of) -> Process
proc_file_node.file_of[process]
return (file_node, process, proc_file_node)
[docs] def make_url(self, event: dict) -> Optional[Tuple[URI, Domain, Process, File, IPAddress]]:
"""Converts a URL access event and returns 5 nodes with 4 different relationships.
Nodes created:
1. URI Accessed (e.g /foobar)
2. Domain Accessed (e.g omer.com)
3. Process performing URL request.
4. File object for the Process image.
5. IP Address the domain resolves to.
Relationships created:
1. URI - (URI Of) -> Domain
2. Domain - (Resolves To) -> IP Address
3. Process - (`http method of event`) -> URI
4. Process - (Connected To) -> IP Address
5. File - (File Of) -> Process
Parameters
----------
event : dict
The urlMonitorEvent events
Returns
-------
Optional[Tuple[URI, Domain, Process, File, IPAddress]]
5 tuple of the nodes pulled out of the event (see function description).
"""
uri = URI(uri=event["requestUrl"])
domain = Domain(domain=event["hostname"])
ip_address = IPAddress(ip_address=event["remoteIpAddress"])
# Pull out the process fields.
process = Process(
process_image=event["process"],
process_image_path=event["processPath"],
command_line=event.get("processCmdLine"),
process_id=int(event["pid"]),
user=event.get("username", None),
)
# Pull out the image of the process
file_node = process.get_file_node()
# File - (File Of) -> Process
file_node.file_of[process]
# Link up the URI and the domain
# URI - (URI Of) -> Domain
uri.uri_of[domain].append(timestamp=event["event_time"])
# HTTP Request to the URL from the process
# Proc - (HTTP Request) -> URI
process.http_request_to[uri].append(
method=event.get("urlMethod"),
user_agent=event.get("userAgent"),
header=event.get("httpHeader"),
timestamp=event["event_time"],
)
# TCP Communication from process to IP of domain
# Process - (Connected To) -> IP Address
process.connected_to[ip_address].append(
port=int(event["remotePort"]), protocol=Protocols.HTTP, timestamp=event["event_time"]
)
# Domain resolving to that IP
# Domain - (Resolves To) -> IP Address
domain.resolves_to[ip_address].append(timestamp=event["event_time"])
return (uri, domain, process, file_node, ip_address)
[docs] def make_network(self, event: dict) -> Optional[Tuple[IPAddress, Process, File]]:
"""Converts a network connection event into a Process, File and IP Address node.
Nodes:
1. IP Address communicated to.
2. Process contacting IP.
3. File process launched from.
Edges:
1. Process - (Connected To) -> IP Address
2. File - (File Of) -> Process
Parameters
----------
event : dict
The ipv4NetworkEvent
Returns
-------
Optional[Tuple[IPAddress, Process, File]]
The IP Address, Process, and Process's File object.
"""
# Pull out the process fields.
process = Process(
process_image=event["process"],
process_image_path=event["processPath"],
process_id=int(event["pid"]),
user=event.get("username", None),
)
# Pull out the image of the process
file_node = process.get_file_node()
# File - (File Of) -> Process
file_node.file_of[process]
# Create the network node
ip_address = IPAddress(event["remoteIP"])
# Create the connection edge
# Process - (Connected To) -> IP Address
process.connected_to[ip_address].append(
timestamp=event["event_time"], protocol=event["protocol"], port=int(event["remotePort"])
)
return (ip_address, process, file_node)
[docs] def make_dnslookup(self, event: dict) -> Optional[Tuple[Domain, Process, File]]:
"""Converts a dnsLookupEvent into a Domain, Process, and Process's File node.
Nodes:
1. Domain looked up.
2. Process performing the lookup.
3. File the Process was launched from.
Edges:
1. Process - (DNS Lookup For) -> Domain.
2. File - (FileOf) -> Process.
Parameters
----------
event : dict
A dnsLookupEvent
Returns
-------
Optional[Tuple[Domain, Process, File]]
The Domain, Process, and File nodes.
"""
# Pull out the process fields.
process = Process(
process_image=event["process"],
process_image_path=event["processPath"],
process_id=int(event["pid"]),
user=event.get("username", None),
)
# Pull out the image of the process
file_node = process.get_file_node()
# File - (File Of) -> Process
file_node.file_of[process]
domain = Domain(event["hostname"])
process.dns_query_for[domain].append(timestamp=event["event_time"])
return (domain, process, file_node)
[docs] def make_imageload(self, event: dict) -> Optional[Tuple[File, Process, File]]:
# Pull out the process fields.
process = Process(
process_image=event["process"],
process_image_path=event["processPath"],
process_id=int(event["pid"]),
user=event.get("username", None),
)
# Pull out the image of the process
file_node = process.get_file_node()
# File - (File Of) -> Process
file_node.file_of[process]
# Add the drive where possible.
if event.get("drive"):
file_path = f"{event['drive']}:\\{event['filePath']}"
else:
file_path = event["filePath"]
loaded_file = File(file_path=file_path, file_name=event["fileName"])
loaded_file.set_extension()
process.loaded[loaded_file].append(timestamp=event["event_time"])
return (loaded_file, process, file_node)
[docs] def make_registry(self, event: dict) -> Optional[Tuple[RegistryKey, Process, File]]:
# Pull out the process fields.
process = Process(
process_image=event["process"],
process_image_path=event["processPath"],
process_id=int(event["pid"]),
user=event.get("username", None),
)
# Pull out the image of the process
file_node = process.get_file_node()
# File - (File Of) -> Process
file_node.file_of[process]
# RegistryKey Node Creation
reg_node = RegistryKey(
hive=event["hive"],
key_path=event["keyPath"],
key=event.get("valueName"),
value=event.get("text"),
value_type=event.get("valueType"),
)
# Space shuttle code for the edge setting
#
# EventType Mappings:
# 1: Value Changed
# 2: Value Deleted
# 3: Key Created
# 4: Key Deleted
#
reg_event_type = str(event["eventType"])
if reg_event_type == "1":
process.changed_value[reg_node].append(timestamp=event["event_time"])
elif reg_event_type == "2":
process.deleted_value[reg_node].append(timestamp=event["event_time"])
elif reg_event_type == "3":
process.created_key[reg_node].append(timestamp=event["event_time"])
elif reg_event_type == "4":
process.deleted_key[reg_node].append(timestamp=event["event_time"])
else:
logger.warn(
f"Found a new registry event type with a value of {reg_event_type}: {event}"
)
return (reg_node, process, file_node)
[docs] def make_alert(self, event: dict) -> Optional[Tuple[Alert, ...]]:
# Fixes issue where no event metadata in the Triage.
if "_threat_data" not in event:
alert_name = event["match_hash"]
else:
uri_name = event["_threat_data"].get("uri_name")
display_name = event["_threat_data"].get("display_name")
alert_name = display_name or uri_name
alert = Alert(alert_name=alert_name, alert_data="No data")
alerting_event_type = event["data"]["key"]["event_type"]
# Strip the event type
alerting_event = event["data"]["values"]
alerting_event["event_type"] = alerting_event_type
# Create all nodes from the alerted on event
alerted_on_nodes = self.transform(alerting_event)
if not alerted_on_nodes:
return (alert,)
# Set edges from the alert node to all alerted on nodes.
for node in alerted_on_nodes:
alert.alerted_on[node].append(timestamp=event["event_time"])
# This returns a tuple compromise of the alert node, and all alerted nodes
return (alert,) + alerted_on_nodes # type: ignore