import datetime
import json
import re
import tempfile
import xml.etree.ElementTree as ET
import zipfile
from typing import Dict, Generator
from beagle.common.logging import logger
from beagle.datasources.base_datasource import DataSource
from beagle.transformers.fireeye_hx_transformer import FireEyeHXTransformer
[docs]class HXTriage(DataSource):
"""A FireEye HX Triage DataSource.
Allows generation of graphs from the redline .mans files generated by FireEye HX.
Examples
-------
>>> triage = HXTriage(file_path="/path/to/triage.mans")
"""
_GENERATOR_REX = re.compile(r"generator=\"(.*)\" generatorVersion")
_SUPPORTED_AUDITS = ["stateagentinspector"]
_ALERT_FILES = ["hits.json", "threats.json"]
name = "FireEye HX Triage"
transformers = [FireEyeHXTransformer]
category = "FireEye HX"
def __init__(self, triage: str) -> None:
"""A FireEye HX Triage DataSource.
Parameters
----------
triage : str
The path to the HX .mans file.
Examples
-------
>>> triage = HXTriage(triage="/path/to/triage.mans")
"""
self.file_path = triage
self.alert_files = {"hits.json": False, "threats.json": False}
self.identified_files: Dict[str, str] = {}
logger.info(f"Setting up HXTriage for {self.file_path}")
self.tempdir = tempfile.TemporaryDirectory(suffix="_beagle")
logger.debug(f"Generated temporary directory {self.tempdir.name}")
# .mans files are simply zips.
with zipfile.ZipFile(self.file_path) as mans:
for audit_file in mans.namelist():
# Save the alert files
if audit_file in self._ALERT_FILES:
with mans.open(audit_file) as f:
mans.extract(audit_file, f"{self.tempdir.name}")
self.alert_files[audit_file] = True
logger.debug(f"Found alert file {audit_file}")
# Skip files with '.' in them
if "." in audit_file:
continue
# Get the audit type.
with mans.open(audit_file) as f:
header = f.read(500).decode("utf-8")
match = self._GENERATOR_REX.search(header)
if match:
version = match.groups()[0]
else:
continue
# Skip if not supported
if version not in self._SUPPORTED_AUDITS:
continue
mans.extract(audit_file, f"{self.tempdir.name}")
self.identified_files[audit_file] = version
logger.debug(f"Mapped {audit_file} to {version}")
def _hx_time_to_epoch(self, timestr: str) -> int:
"""Converts an HX Time string to epoch time
Parameters
----------
timestr : str
Time string in format "%Y-%m-%dT%H:%M:%S.%fZ"
Returns
-------
int
epoch time
"""
time_obj = datetime.datetime.strptime(timestr, "%Y-%m-%dT%H:%M:%S.%fZ")
return int(time_obj.strftime("%s"))
def _fix_missing_fields(self, event: dict) -> dict:
"""Fixes certain edge cases when events are being fed in:
1. When `pid` == 4, there is no process or path.
* We add in a SYSTEM node.
{
'remoteIP': '10.0.0.102',
'remotePort': '445',
'localIP': '10.0.0.61',
'localPort': '57272',
'protocol': 'TCP',
'pid': '4',
'event_type': 'ipv4NetworkEvent',
'event_time': 1527196685
}
2. When `pid` == 0, there is no associated process.
* We add in an Unknown node.
3. When an event has data but no associated processPath:
* Give it the Unknown
{
'hive': 'HKEY_LOCAL_MACHINE\\SYSTEM',
'keyPath': 'CurrentControlSet\\services\\Tcpip\\Parameters',
'path': 'HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\services\\Tcpip\\Parameters',
'originalPath': 'HKEY_LOCAL_MACHINE\\SYSTEM\\ControlSet001\\services\\Tcpip\\Parameters',
'eventType': '3',
'pid': '3000',
'event_time': 1527622979
}
Parameters
----------
event : dict
Event from parser.
Returns
-------
dict
Cleaned event.
"""
pid_values = {
"4": {"processPath": "\\", "process": "SYSTEM", "username": "NT AUTHORITY\\SYSTEM"},
"0": {"processPath": "\\", "process": "System Idle Process", "username": "Unknown"},
}
# Fix cases 1 and 2.
if event.get("pid") in pid_values:
event.update(pid_values[event["pid"]])
# If there's a PID but no path, update with unknown.
if "pid" in event and "processPath" not in event:
event.update(pid_values["0"])
return event
[docs] def parse_agent_events(self, agent_events_file: str) -> Generator[dict, None, None]:
"""Generator over the agent events file. Converts each XML into a dictionary.
Timestamps are converted to epoch time.
The below XML entry::
<eventItem uid="39265403">
<timestamp>2018-06-27T21:15:32.678Z</timestamp>
<eventType>dnsLookupEvent</eventType>
<details>
<detail>
<name>hostname</name>
<value>github.com</value>
</detail>
<detail>
<name>pid</name>
<value>12345</value>
</detail>
<detail>
<name>process</name>
<value>git.exe</value>
</detail>
<detail>
<name>processPath</name>
<value>c:\\windows\\</value>
</detail>
<detail>
<name>username</name>
<value>Bob/Schmob</value>
</detail>
</details>
</eventItem>
becomes::
{
"timestamp": 1530134132,
"eventType": "dnsLookupEvent",
"hostname": "github.com",
"pid": "12345",
"process": "git.exe",
"processPath": "c:\\windows\\",
"username": "Bob/Schmob",
}
Parameters
----------
agent_events_file : str
The path to the file containing the agent events.
Returns
-------
Generator[dict, None, None]
Generator over agent events.
"""
agent_events_file_handle = open(agent_events_file, "r")
xml = ET.iterparse(agent_events_file_handle, events=("start", "end"))
for _event, elem in xml:
if elem.tag == "eventItem" and _event == "end":
event_time = elem[0].text
event_type = elem[1].text
event = {detail[0].text: detail[1].text for detail in elem[-1]}
event["event_type"] = event_type
event["event_time"] = self._hx_time_to_epoch(event_time)
event = self._fix_missing_fields(event)
elem.clear()
yield event
[docs] def parse_alert_files(self, temp_dir: str) -> Generator[dict, None, None]:
"""Parses out the alert files from the hits.json and threats.json files
Parameters
----------
temp_dir : str
Folder which contains the expanded triage.
Yields
-------
Generator[dict, None, None]
The next event found in the Triage.
"""
threats = None
# We will always have 'hits.json'
try:
hits = json.load(open(f"{temp_dir}/hits.json", "r"))
except Exception as e:
logger.warning(f"Could not load JSON from hits.json, skipping alerts!")
logger.debug(e)
return
if self.alert_files["threats.json"]:
try:
threats = json.load(open(f"{temp_dir}/threats.json", "r"))
except Exception as e:
logger.warning(f"Could not load JSON from threats.json, alert names may be UUIDs")
logger.debug(e)
else:
logger.info(f"Could not find threats.json, alert names may be UUIDs")
for alert in hits:
# Introduce an alerting event type for HX.
alert["event_type"] = "alertEvent"
# If we have the threats file, convert the "threat_id" portion of the alert to
# understandable values.
if threats:
threat = next(
filter(lambda threat_entry: threat_entry["_id"] == alert["threat_id"], threats),
None,
)
if threat:
logger.info(
f"Matched up {alert['threat_id']} to {threat.get('display_name', threat['uri_name'])}"
)
alert["_threat_data"] = threat
# Add the time the alert happend
alert["event_time"] = self._hx_time_to_epoch(alert["matched_at"])
alerting_event_type = alert["data"]["key"]["event_type"]
# Strip the event type
alerting_event = {
k.replace(f"{alerting_event_type}/", ""): v
for k, v in alert["data"]["values"].items()
}
alerting_event["event_time"] = self._hx_time_to_epoch(alerting_event["timestamp"])
alert["data"]["values"] = self._fix_missing_fields(alerting_event)
yield alert
[docs] def events(self) -> Generator[dict, None, None]:
"""Yields each event in the triage from the supported files."""
for audit_file, audit_type in self.identified_files.items():
temp_file_path = f"{self.tempdir.name}/{audit_file}"
if audit_type == "stateagentinspector":
yield from self.parse_agent_events(temp_file_path)
# If we have atleast the hits.json file, we can make alert nodes
if self.alert_files["hits.json"]:
yield from self.parse_alert_files(self.tempdir.name)
self.tempdir.cleanup()