from typing import Any, Dict, List
[docs]class Edge(object):
"""The base Edge class.
An edge simply stores metadata about interaction between two nodes. Each
Edge object is simply meant to store the metadata **on that Edge**. For example,
for a file write event, it may want to store the time of the write, and the contents
written.
Since a write by Process A to File B may occur multiple times, all the properties
stored on the edge must be arrays. When generating the graph, Beagle will either unpack
all N properties into N edges or create a single edge with all the metadata. This will
depend on the configuration for that run.
Examples
--------
The below shows an Edge which represents a process launch. The edge contains
a list of timestamps at which the parent process launched the child process::
class Launched(Edge):
__name__ = "Launched"
timestamp: int
def __init__(self) -> None:
super().__init__()
The edge would be used in the Process class as follows::
class Process(Node):
...
# List of launched processes
launched: DefaultDict["Process", Launched]
This would allow a process `parent` to add that it launched `child` at time 145:
>>> proc.launched[child].append(timestamp=145)
You can also add edges without explicitly adding data:
>>> proc.launched[child]
"""
_events: List[dict] = []
def __init__(self) -> None:
self._events = []
def __add__(self, data: Dict[Any, Any]) -> "Edge":
for key, value in data.items():
if key not in self.__annotations__:
raise RuntimeError(
f"{key} is not a valid field for a {self.__class__.__name__} edge. "
+ f"Valid fields are {self.__annotations__}"
)
entry = {k: None for k, _ in self.__annotations__.items()}
entry.update(data)
self._events.append(entry)
return self
@property
def _display(self):
return self.__name__
[docs] def append(self, **kwargs) -> None:
"""Appends the keyword arguments as an entry on the edge
Examples
--------
>>> proc.launched[child].append(timestamp=145)
>>> proc.launched[child].append(**{"timestamp": 145})
"""
for key, value in kwargs.items():
if key not in self.__annotations__:
raise RuntimeError(
f"{key} is not a valid field for a {self.__class__.__name__} edge."
+ f"Valid fields are {self.__annotations__}"
)
entry = {k: None for k, _ in self.__annotations__.items()}
entry.update(kwargs)
self._events.append(entry)
def __contains__(self, data: Dict[Any, Any]):
found = {k: False for k in data.keys()}
for event in self._events:
for key, value in data.items():
if event[key] == value:
found[key] = True
return all(found.values())
def __len__(self) -> int:
return len(self._events)