from collections import defaultdict
import networkx as nx
from typing import Optional
from beagle.common import logger
from beagle.nodes import Node
from beagle.backends.base_backend import Backend
[docs]class NetworkX(Backend):
"""NetworkX based backend. Other backends can subclass this backend in order to have access
to the underlying NetworkX object.
While inserting the Nodes into the graph, the NetworkX object does the following:
1. If the ID of this node (calculated via `Node.__hash__`) is already in the graph, the node is
updated with any properties which are in the new node but not the existing node.
2. If we are inserting the an edge type that already exists between two
nodes `u` and `v`, the edge data is combined.
Notes
---------
In `networkx`, adding the same node twice keeps the latest version of the node. Since
a node that represents the same thing may appear twice in a log (for example, the same
process might appear in a process creation event and a file write event).
It's easier to simply update the nodes as you iterate over the `nodes` attribute.
Parameters
----------
metadata : dict, optional
The metadata from the datasource.
consolidate_edges: boolean, optional
Controls if edges are consolidated. That is, if the edge of type q from u to v happens N times,
should there be one edge from u to v with type q, or should there be N edges.
Notes
-------
Putting
"""
def __init__(
self, metadata: dict = {}, consolidate_edges: bool = False, *args, **kwargs
) -> None:
self.metadata = metadata
self.consolidate_edges = consolidate_edges
self.G = nx.MultiDiGraph(metadata=metadata)
super().__init__(*args, **kwargs)
logger.info("Initialized NetworkX Backend")
[docs] @logger.catch
def graph(self) -> nx.MultiDiGraph:
"""Generates the MultiDiGraph.
Places the nodes in the Graph.
Returns
-------
nx.MultiDiGraph
The generated NetworkX object.
"""
logger.info("Beginning graph generation.")
for node in self.nodes:
node_id = hash(node)
self.insert_node(node, node_id)
logger.info("Completed graph generation.")
logger.info(f"Graph contains {len(self.G.nodes())} nodes and {len(self.G.edges())} edges.")
return self.G
[docs] def insert_node(self, node: Node, node_id: int) -> None:
"""Inserts a node into the graph, as well as all edges outbound from it.
If a node with `node_id` already exists, the node data is updated using
:py:meth:`update_node`.
Parameters
----------
node : Node
Node object to insert
no`de_id : int
The ID of the node (`hash(node)`)
"""
if node_id not in self.G.nodes:
self.G.add_node(node_id, data=node)
else:
self.update_node(node, node_id)
for edge_dict in node.edges:
for dest_node, edge_data in edge_dict.items():
# If there's no data on the edges, insert at least one to represent
# the edge exists
if len(edge_data._events) == 0:
self.insert_edge(
u=node, # Source node
v=dest_node, # Dest Node
edge_name=edge_data.__name__, # Edge name
data=None,
)
else:
# Otherwise, insert all the edge instances.
for entry in getattr(edge_data, "_events", [None]):
self.insert_edge(
u=node, # Source node
v=dest_node, # Dest Node
edge_name=edge_data.__name__, # Edge name
data=entry,
)
[docs] def insert_edge(self, u: Node, v: Node, edge_name: str, data: Optional[dict]) -> None:
"""Insert an edge from `u` to `v` with type `edge_name` that contains data
`data`.
If the edge already exists, the data entry is appended to the existing data
array.
This results in a single edge between `u` and `v` per `edge_name`. And each
occurence of that edge is represented by an entry in the `data` list.
Parameters
----------
u : Node
Source Node object
v : Node
Destination Node object
edge_name : str
Edge Name
data : dict
Data entry to place on this edge.
"""
u_id = hash(u)
v_id = hash(v)
if v_id in self.G.nodes:
self.update_node(v, v_id)
else:
# First time, make an array.
self.G.add_node(v_id, data=v)
# If we consolidate edges, the key is the edge name, and we update the data.
if self.consolidate_edges:
curr = self.G.get_edge_data(u=u_id, v=v_id, key=edge_name, default=None)
if curr is None:
self.G.add_edge(
u_for_edge=u_id,
v_for_edge=v_id,
key=edge_name,
data=([data] if data else []),
edge_name=edge_name,
)
elif data:
curr = curr["data"]
curr.append(data)
nx.set_edge_attributes(
self.G, {(u_id, v_id, edge_name): {"data": curr, "edge_name": edge_name}}
)
# Otherwise, they key is assigned from NetworkX, and we add the edge type as a label:
else:
self.G.add_edge(
u_for_edge=u_id, v_for_edge=v_id, data=([data] if data else []), edge_name=edge_name
)
[docs] def update_node(self, node: Node, node_id: int) -> None:
"""Update the attributes of a node. Since we may see the same Node in multiple events,
we want to have the largest coverage of its attributes.
* See :class:`beagle.nodes.node.Node` for how we determine two nodes are the same.
This method updates the node already in the graph with the newest attributes
from the passed in parameter `Node`
Parameters
----------
node : Node
The Node object to use to update the node already in the graph
node_id : int
The hash of the Node. see :py:meth:`beagle.nodes.node.__hash__`
"""
current_data = self.G.nodes[node_id]["data"]
for key, value in node.__dict__.items():
# NOTE: Skips edge combination because edge data is
# added anyway in self.insert_node()
if isinstance(value, defaultdict):
continue
# Always use the latest value.
if value:
setattr(current_data, key, value)
nx.set_node_attributes(self.G, {node_id: {"data": current_data}})
[docs] def to_json(self) -> dict:
"""Convert the graph to JSON, which can later be used be read in using
networkx::
>>> backend = NetworkX(nodes=nodes)
>>> G = backend.graph()
>>> data = G.to_json()
>>> parsed = networkx.readwrite.json_graph.node_link_graph(data)
Returns
-------
dict
node_link compatible version of the graph.
"""
def node_to_json(node_id: int, node: Node) -> dict:
return {
"id": node_id,
"properties": node.to_dict(),
"_node_type": node.__name__,
"_display": node._display,
"_color": node.__color__,
}
def edge_to_json(edge_id: int, u: int, v: int, edge_key: str, edge_props: dict) -> dict:
return {
"id": edge_id,
"source": u,
"target": v,
"type": edge_props["edge_name"],
"properties": {"data": edge_props["data"]},
}
relationships = [
edge_to_json(
index + 1, # Unique ID based on index.
edge[0], # Source node (u)
edge[1], # Destination node (v)
edge[2], # Edge type (e.g "wrote")
edge[3], # Edge data
)
for index, edge in enumerate(self.G.edges(data=True, keys=True))
]
nodes = [
node_to_json(node, node_data["data"]) for node, node_data in self.G.nodes(data=True)
]
return {
"directed": self.G.is_directed(),
"multigraph": self.G.is_multigraph(),
"nodes": nodes,
"links": relationships,
}