Source code for beagle.datasources.hx_triage

import datetime
import json
import re
import tempfile
import xml.etree.ElementTree as ET
import zipfile
from typing import Dict, Generator

from beagle.common.logging import logger
from beagle.datasources.base_datasource import DataSource
from beagle.transformers.fireeye_hx_transformer import FireEyeHXTransformer


[docs]class HXTriage(DataSource): """A FireEye HX Triage DataSource. Allows generation of graphs from the redline .mans files generated by FireEye HX. Examples ------- >>> triage = HXTriage(file_path="/path/to/triage.mans") """ _GENERATOR_REX = re.compile(r"generator=\"(.*)\" generatorVersion") _SUPPORTED_AUDITS = ["stateagentinspector"] _ALERT_FILES = ["hits.json", "threats.json"] name = "FireEye HX Triage" transformers = [FireEyeHXTransformer] category = "FireEye HX" def __init__(self, triage: str) -> None: """A FireEye HX Triage DataSource. Parameters ---------- triage : str The path to the HX .mans file. Examples ------- >>> triage = HXTriage(triage="/path/to/triage.mans") """ self.file_path = triage self.alert_files = {"hits.json": False, "threats.json": False} self.identified_files: Dict[str, str] = {} logger.info(f"Setting up HXTriage for {self.file_path}") self.tempdir = tempfile.TemporaryDirectory(suffix="_beagle") logger.debug(f"Generated temporary directory {self.tempdir.name}") # .mans files are simply zips. with zipfile.ZipFile(self.file_path) as mans: for audit_file in mans.namelist(): # Save the alert files if audit_file in self._ALERT_FILES: with mans.open(audit_file) as f: mans.extract(audit_file, f"{self.tempdir.name}") self.alert_files[audit_file] = True logger.debug(f"Found alert file {audit_file}") # Skip files with '.' in them if "." in audit_file: continue # Get the audit type. with mans.open(audit_file) as f: header = f.read(500).decode("utf-8") match = self._GENERATOR_REX.search(header) if match: version = match.groups()[0] else: continue # Skip if not supported if version not in self._SUPPORTED_AUDITS: continue mans.extract(audit_file, f"{self.tempdir.name}") self.identified_files[audit_file] = version logger.debug(f"Mapped {audit_file} to {version}") def _hx_time_to_epoch(self, timestr: str) -> int: # pragma: no cover """Converts an HX Time string to epoch time Parameters ---------- timestr : str Time string in format "%Y-%m-%dT%H:%M:%S.%fZ" Returns ------- int epoch time """ time_obj = datetime.datetime.strptime(timestr, "%Y-%m-%dT%H:%M:%S.%fZ") return int(time_obj.strftime("%s")) def _fix_missing_fields(self, event: dict) -> dict: """Fixes certain edge cases when events are being fed in: 1. When `pid` == 4, there is no process or path. * We add in a SYSTEM node. { 'remoteIP': '10.0.0.102', 'remotePort': '445', 'localIP': '10.0.0.61', 'localPort': '57272', 'protocol': 'TCP', 'pid': '4', 'event_type': 'ipv4NetworkEvent', 'event_time': 1527196685 } 2. When `pid` == 0, there is no associated process. * We add in an Unknown node. 3. When an event has data but no associated processPath: * Give it the Unknown { 'hive': 'HKEY_LOCAL_MACHINE\\SYSTEM', 'keyPath': 'CurrentControlSet\\services\\Tcpip\\Parameters', 'path': 'HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\services\\Tcpip\\Parameters', 'originalPath': 'HKEY_LOCAL_MACHINE\\SYSTEM\\ControlSet001\\services\\Tcpip\\Parameters', 'eventType': '3', 'pid': '3000', 'event_time': 1527622979 } Parameters ---------- event : dict Event from parser. Returns ------- dict Cleaned event. """ pid_values = { "4": {"processPath": "\\", "process": "SYSTEM", "username": "NT AUTHORITY\\SYSTEM"}, "0": {"processPath": "\\", "process": "System Idle Process", "username": "Unknown"}, } # Fix cases 1 and 2. if event.get("pid") in pid_values: event.update(pid_values[event["pid"]]) # If there's a PID but no path, update with unknown. if "pid" in event and "processPath" not in event: event.update(pid_values["0"]) return event
[docs] def parse_agent_events(self, agent_events_file: str) -> Generator[dict, None, None]: """Generator over the agent events file. Converts each XML into a dictionary. Timestamps are converted to epoch time. The below XML entry:: <eventItem uid="39265403"> <timestamp>2018-06-27T21:15:32.678Z</timestamp> <eventType>dnsLookupEvent</eventType> <details> <detail> <name>hostname</name> <value>github.com</value> </detail> <detail> <name>pid</name> <value>12345</value> </detail> <detail> <name>process</name> <value>git.exe</value> </detail> <detail> <name>processPath</name> <value>c:\\windows\\</value> </detail> <detail> <name>username</name> <value>Bob/Schmob</value> </detail> </details> </eventItem> becomes:: { "timestamp": 1530134132, "eventType": "dnsLookupEvent", "hostname": "github.com", "pid": "12345", "process": "git.exe", "processPath": "c:\\windows\\", "username": "Bob/Schmob", } Parameters ---------- agent_events_file : str The path to the file containing the agent events. Returns ------- Generator[dict, None, None] Generator over agent events. """ agent_events_file_handle = open(agent_events_file, "r") xml = ET.iterparse(agent_events_file_handle, events=("start", "end")) for _event, elem in xml: if elem.tag == "eventItem" and _event == "end": event_time = elem[0].text event_type = elem[1].text event = {detail[0].text: detail[1].text for detail in elem[-1]} event["event_type"] = event_type event["event_time"] = self._hx_time_to_epoch(event_time) event = self._fix_missing_fields(event) elem.clear() yield event
[docs] def parse_alert_files(self, temp_dir: str) -> Generator[dict, None, None]: """Parses out the alert files from the hits.json and threats.json files Parameters ---------- temp_dir : str Folder which contains the expanded triage. Yields ------- Generator[dict, None, None] The next event found in the Triage. """ threats = None # We will always have 'hits.json' try: hits = json.load(open(f"{temp_dir}/hits.json", "r")) except Exception as e: logger.warning(f"Could not load JSON from hits.json, skipping alerts!") logger.debug(e) return if self.alert_files["threats.json"]: try: threats = json.load(open(f"{temp_dir}/threats.json", "r")) except Exception as e: logger.warning(f"Could not load JSON from threats.json, alert names may be UUIDs") logger.debug(e) else: logger.info(f"Could not find threats.json, alert names may be UUIDs") for alert in hits: # Introduce an alerting event type for HX. alert["event_type"] = "alertEvent" # If we have the threats file, convert the "threat_id" portion of the alert to # understandable values. if threats: threat = next( filter(lambda threat_entry: threat_entry["_id"] == alert["threat_id"], threats), None, ) if threat: logger.info( f"Matched up {alert['threat_id']} to {threat.get('display_name', threat['uri_name'])}" ) alert["_threat_data"] = threat # Add the time the alert happend alert["event_time"] = self._hx_time_to_epoch(alert["matched_at"]) alerting_event_type = alert["data"]["key"]["event_type"] # Strip the event type alerting_event = { k.replace(f"{alerting_event_type}/", ""): v for k, v in alert["data"]["values"].items() } alerting_event["event_time"] = self._hx_time_to_epoch(alerting_event["timestamp"]) alert["data"]["values"] = self._fix_missing_fields(alerting_event) yield alert
[docs] def events(self) -> Generator[dict, None, None]: """Yields each event in the triage from the supported files.""" for audit_file, audit_type in self.identified_files.items(): temp_file_path = f"{self.tempdir.name}/{audit_file}" if audit_type == "stateagentinspector": yield from self.parse_agent_events(temp_file_path) # If we have atleast the hits.json file, we can make alert nodes if self.alert_files["hits.json"]: yield from self.parse_alert_files(self.tempdir.name) self.tempdir.cleanup()
[docs] def metadata(self) -> dict: """Returns basic information about the triage. 1. Agent ID 2. Hostname 3. Platform (win, osx, linux) 4. Triggering Alert name (if exists) 5. Link to the controller the triage is from Returns ------- dict Metadata for the submitted HX Triage. """ try: with zipfile.ZipFile(self.file_path).open("metadata.json") as meta: metadata = json.load(meta) if "hit" in metadata: threats = metadata["hit"].get("threats", [{}]) display_name = threats[0].get("display_name") uri_name = threats[0].get("uri_name") alert_name = display_name or uri_name or "Unknown Alert Name" else: alert_name = "No Alert" return { "hostname": metadata["agent"].get("sysinfo", {"hostname": "Unknown"})[ "hostname" ], "agent_id": metadata["agent"]["_id"], "alert_name": alert_name, "platform": metadata["agent"].get("sysinfo", {"platform": "Unknown"})[ "platform" ], "domain": metadata["agent"].get("sysinfo", {"domain": "Unknown"})["domain"], "controller_link": f'{metadata["appliance_uri"]}/hx/hosts/{metadata["agent"]["_id"]}', } except Exception as e: logger.error(f"Could not parse triage's metadata due to {e}") return {}