Source code for beagle.transformers.pcap_transformer

from typing import Dict, Optional, Tuple

from beagle.common import logger
from beagle.nodes import URI, Domain, IPAddress, Node
from beagle.transformers.base_transformer import Transformer


[docs]class PCAPTransformer(Transformer): name = "PCAP" def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) logger.info("Created PCAP Transformer")
[docs] def transform(self, event: Dict) -> Optional[Tuple[Node, ...]]: event_type = event["event_type"] # Skip all ether events, no src/dst IP if event_type in ["Ether", "IP"] or "src_ip" not in event or "dst_ip" not in event: return None src = IPAddress(ip_address=event["src_ip"], mac=event["src_mac"]) dst = IPAddress(ip_address=event["dst_ip"], mac=event["dst_mac"]) src.connected_to[dst].append( port=event["dport"], protocol=event["protocol"], payload=event["payload"], timestamp=event["timestamp"], ) if event_type == "HTTPRequest": dom = Domain(event["http_dest"]) uri = URI(event["uri"]) src.http_request_to[uri].append( method=event["http_method"], timestamp=event["timestamp"] ) dom.resolves_to[dst] uri.uri_of[dom] return (src, dst, dom, uri) if event_type == "DNS": if event["qname"][-1] == ".": event["qname"] = event["qname"][:-1] dom = Domain(event["qname"]) src.dns_query_for[dom].append(record_type=event["qtype"], timestamp=event["timestamp"]) if "qanswer" in event: ip = IPAddress(event["qanswer"]) dom.resolves_to[ip].append(timestamp=event["timestamp"]) return (src, dom, ip, dst) return (src, dom, dst) return (src, dst)