import hashlib
import inspect
import json
import os
import sys
import tempfile
from inspect import _empty # type: ignore
from typing import Any, Dict, List, Tuple, Type, cast
from flask import Blueprint, jsonify, request
from flask.helpers import make_response
import beagle.datasources # noqa: F401
import beagle.transformers # noqa: F401
from beagle.backends import Backend
from beagle.backends.networkx import NetworkX
from beagle.common import logger
from beagle.config import Config
from beagle.datasources import DataSource
from beagle.datasources.base_datasource import ExternalDataSource
from beagle.datasources.json_data import JSONData
from beagle.transformers import Transformer
from beagle.web.api.models import Graph
from beagle.web.server import db
api = Blueprint("api", __name__, url_prefix="/api")
# Define a mapping between datasource classes to strings
DATASOURCES = {
# Class name is used here.
cls[1].__name__: cls[1]
for cls in inspect.getmembers(
sys.modules["beagle.datasources"],
lambda cls: inspect.isclass(cls) and not inspect.isabstract(cls),
)
}
# Define a mapping between transformer class *names* to class objects
TRANSFORMERS = {
# Human-readable name used here.
cls[1].__name__: cls[1]
for cls in inspect.getmembers(
sys.modules["beagle.transformers"],
lambda cls: inspect.isclass(cls) and not inspect.isabstract(cls),
)
}
BACKENDS = {
cls[1].__name__: cls[1]
for cls in inspect.getmembers(
sys.modules["beagle.backends"],
lambda cls: inspect.isclass(cls) and not inspect.isabstract(cls),
)
}
# Generate an array containing a description of each datasource.
# This includes it's name, it's id, it's required parameters, and the transformers
# which it can send data to.
SCHEMA = {
"datasources": [
{
"id": datasource.__name__,
"name": datasource.name,
"params": [
{
"name": k,
"required": (v.default == _empty),
"default": None if v.default == _empty else v.default,
} # Check if there is a default value, if not, required.
for k, v in inspect.signature(
datasource
).parameters.items() # Gets the expected parameters
],
"type": "external" if issubclass(datasource, ExternalDataSource) else "files",
"transformers": [
{"id": trans.__name__, "name": trans.name} for trans in datasource.transformers
],
}
for datasource in DATASOURCES.values()
],
"backends": [
{"id": backend.__name__, "name": backend.__name__} for backend in BACKENDS.values()
],
}
[docs]@api.route("/datasources")
def pipelines(): # pragma: no cover
"""Returns a list of all available datasources, their parameters,
names, ids, and supported transformers.
A single entry in the array is formatted as follows:
>>> {
"id": str,
"name": str,
"params": [
{
"name": str,
"required": bool,
}
...
],
"transformers": [
{
"id": str,
"name": str
}
]
"type": "files" OR "external
}
If the 'type' field is set to 'files', it means that the parameters
represent required files, if it is set to 'external' this means that the
parameters represent string inputs.
The main purpose of this endpoint is to allow users to query beagle
in order to easily identify what datasource and transformer combinations
are possible, as well as what parameters are required.
Returns
-------
List[dict]
An array of datasource specifications.
"""
response = jsonify(SCHEMA)
return response
[docs]@api.route("/backends")
def get_backends(): # pragma: no cover
"""Returns all possible backends, their names, and their IDs.
The array contains elements with the following structure.
>>> {
id: string, # class name
name: string # Human-readable name
}
These map back to the __name__ attributes of Backend subclasses.
Returns
-------
List[dict]
Array of {id: string, name: string} entries.
"""
response = jsonify(
[{"id": backend.__name__, "name": backend.__name__} for backend in BACKENDS.values()]
)
return response
[docs]@api.route("/new", methods=["POST"])
def new():
"""Generate a new graph using the supplied DataSource, Transformer, and the parameters
passed to the DataSource.
At minimum, the user must supply the following form parameters:
1. datasource
2. transformer
3. comment
4. backend
Outside of that, the user must supply at **minimum** the parameters marked by
the datasource as required.
* Use the /api/datasources endpoint to see which ones these are.
* Programmatically, these are any parameters without a default value.
Failure to supply either the minimum three or the required parameters for that datasource
returns a 400 status code with the missing parameters in the 'message' field.
If any part of the graph creation yields an error, a 500 HTTP code is returend with the
python exception as a string in the 'message' field.
If the graph is succesfully created, the user is returned a dictionary with the ID of the graph
and the URI path to viewing it in the *beagle web interface*.
For example:
>>> {
id: 1,
self: /fireeye_hx/1
}
Returns
-------
dict
{id: integer, self: string}
"""
# Returns a tuple of (dict, bool).
resp, success = _validate_params(form=request.form, files=request.files)
# If false, return error message
if not success:
return make_response(jsonify(resp), 400)
datasource_cls: Type[DataSource] = resp["datasource"]
transformer_cls: Type[Transformer] = resp["transformer"]
backend_cls: Type[Backend] = resp["backend"]
datasource_schema = resp["schema"]
# If this class extends the ExternalDataSource class, we know that the parameters
# represent strings, and not files.
is_external = issubclass(datasource_cls, ExternalDataSource)
logger.info(
f"Recieved upload request for datasource=<{datasource_cls.__name__}>, "
+ f"transformer=<{transformer_cls.__name__}>, backend=<{backend_cls.__name__}>"
)
logger.info("Transforming data to a graph.")
params = _setup_params(form=request.form, schema=datasource_schema, is_external=is_external)
resp, success = _create_graph(
datasource_cls=datasource_cls,
transformer_cls=transformer_cls,
backend_cls=backend_cls,
params=params,
is_external=is_external,
)
if not success:
return make_response(jsonify(resp), 400)
G = resp["graph"]
# If the backend is NetworkX, save the graph.
# Otherwise, redirect the user to wherever he sent it (if possible)
if backend_cls.__name__ == "NetworkX":
response = _save_graph_to_db(backend=resp["backend"], category=datasource_cls.category)
response = jsonify(response)
else:
logger.debug(G)
response = jsonify({"resp": G})
return response
[docs]@api.route("/add/<int:graph_id>", methods=["POST"])
def add(graph_id: int):
"""Add data to an existing NetworkX based graph.
Parameters
----------
graph_id : int
The graph ID to add to.
"""
graph_obj = Graph.query.filter_by(id=graph_id).first()
if not graph_obj:
return make_response(jsonify({"message": "Graph not found"}), 404)
# Validate the parameters are valid.
# Returns a tuple of (dict, bool).
resp, success = _validate_params(form=request.form, files=request.files)
# If false, return error message
if not success:
return make_response(jsonify(resp), 400)
datasource_cls: Type[DataSource] = resp["datasource"]
transformer_cls: Type[Transformer] = resp["transformer"]
backend_cls: Type[Backend] = resp["backend"]
is_external = issubclass(datasource_cls, ExternalDataSource)
# Only NetworkX for now.
if backend_cls.__name__ != "NetworkX":
logger.info("Cannot append to non NetworkX graphs for now.")
return make_response(jsonify({"message": "Can only add to NetworkX Graphs for now."}), 400)
# Cast to NetworkX
backend_cls = cast(Type[NetworkX], backend_cls)
datasource_schema = resp["schema"]
# If this class extends the ExternalDataSource class, we know that the parameters
# represent strings, and not files.
logger.info(
f"Recieved add data request for existing graph=<{graph_id}>"
+ f"datasource=<{datasource_cls.__name__}>, "
+ f"transformer=<{transformer_cls.__name__}>, backend=<{backend_cls.__class__.__name__}>"
)
params = _setup_params(form=request.form, schema=datasource_schema, is_external=is_external)
# NOTE: This will all need to change for support non NetworkX backends.
# Get the existing graph as JSON
dest_path = f"{Config.get('storage', 'dir')}/{graph_obj.category}/{graph_obj.file_path}"
json_data = json.load(open(dest_path, "r"))
# Make a dummy backend instance
backend_instance = backend_cls(nodes=[], consolidate_edges=True)
existing_graph = backend_cls.from_json(json_data)
# Set the graph
backend_instance.G = existing_graph
resp, success = _add_to_exiting_graph(
existing_backend=backend_instance,
datasource_cls=datasource_cls,
transformer_cls=transformer_cls,
params=params,
is_external=is_external,
)
if not success:
return make_response(jsonify(resp), 400)
# Save the existing graph object to disk.
resp = _save_graph_to_db(
backend=backend_instance,
# Use the existing category.
category=graph_obj.category,
# Graph ID
graph_id=graph_obj.id,
)
return make_response(jsonify(resp), 200)
def _validate_params(form: dict, files: dict) -> Tuple[dict, bool]:
"""Validates that the passed in parameters are valid. Test for the following:
1. Datasource, comment, and transformer all passed in (backend is optional).
2. For the datasource requested, all of the parameters to the datasource are present.
Parameters
----------
form : dict
The HTTP form sent
files : dict
The files sent along the form, if any
Returns
-------
Tuple[dict, bool]
Return (error message, False) if not valid, otherwise (config, True)
"""
# Verify we have the basic parameters.
missing_params = []
for req_param in ["datasource", "transformer", "comment"]:
if req_param not in form:
missing_params.append(req_param)
if len(missing_params) > 0:
logger.debug(f"Request to /new missing parameters: {missing_params}")
return ({"message": f"Missing parameters {missing_params}"}, False)
# Pull out the requested datasource/transformer.
requested_datasource = form["datasource"]
requested_transformer = form["transformer"]
# Backend is optional
requested_backend = form.get("backend", "NetworkX")
datasource_schema = next(
filter(lambda entry: entry["id"] == requested_datasource, SCHEMA["datasources"]), None
)
if datasource_schema is None:
logger.debug(f"User requested a non-existent data source {requested_datasource}")
resp = {
"message": f"Requested datasource '{requested_datasource}' is invalid, "
+ "please use /api/datasources to find a list of valid datasources"
}
return (resp, False)
datasource_cls = DATASOURCES[requested_datasource]
transformer_cls = TRANSFORMERS[requested_transformer]
backend_cls = BACKENDS[requested_backend]
required_params: List[Dict[str, Any]] = datasource_schema["params"]
is_external = issubclass(datasource_cls, ExternalDataSource)
# Make sure the user provided all required parameters for the datasource.
datasource_missing_params = []
for param in required_params:
# Skip missing parameters
if param["required"] is False:
continue
if is_external and param["name"] not in form:
datasource_missing_params.append(param["name"])
if not is_external and param["name"] not in files:
datasource_missing_params.append(param["name"])
if len(datasource_missing_params) > 0:
logger.debug(
f"Missing datasource {'form' if is_external else 'files'} params {datasource_missing_params}"
)
resp = {
"message": f"Missing datasource {'form' if is_external else 'files'} params {datasource_missing_params}"
}
return (resp, False)
return (
{
"datasource": datasource_cls,
"transformer": transformer_cls,
"backend": backend_cls,
"schema": datasource_schema,
"required_params": required_params,
},
True,
)
def _setup_params(form: dict, schema: dict, is_external: bool) -> dict:
logger.debug("Setting up parameters")
params: Dict[str, Any] = {}
if is_external:
# External parameters are in the form
params = {}
for param in schema["params"]:
if param["name"] in request.form:
params[param["name"]] = request.form[param["name"]]
logger.info(f"ExternalDataSource params received {params}")
else:
for param in schema["params"]:
# Save the files, keep track of which parameter they represent
if param["name"] in request.files:
params[param["name"]] = tempfile.NamedTemporaryFile()
request.files[param["name"]].save(params[param["name"]].name)
params[param["name"]].seek(0)
logger.info(f"Saved uploaded files {params}")
logger.debug("Set up parameters")
return params
def _create_graph(
datasource_cls: Type[DataSource],
transformer_cls: Type[Transformer],
backend_cls: Type[Backend],
params: Dict[str, Any],
is_external: bool,
) -> Tuple[dict, bool]:
try:
# Set up parameters for datasource class
datasource_params = (
# Use filenames if we are referencing a temporary file
{param_name: tempfile.name for param_name, tempfile in params.items()}
if not is_external
else params
)
# Create the datasource
datasource = datasource_cls(**datasource_params) # type: ignore
# Create transformer
transformer = datasource.to_transformer(transformer_cls)
# Create the nodes
nodes = transformer.run()
# Create the backend
backend_instance = backend_cls( # type: ignore
metadata=datasource.metadata(), nodes=nodes, consolidate_edges=True
)
# Make the graph
G = backend_instance.graph()
except Exception as e:
logger.critical(f"Failure to generate graph {e}")
import traceback
logger.debug(f"{traceback.format_exc()}")
if not is_external:
# Clean up temporary files
try:
for _tempfile in params.values():
_tempfile.close()
except Exception as e:
logger.critical(f"Failure to clean up temporary files after error {e}")
return {"message": str(e)}, False # type: ignore
logger.info("Cleaning up tempfiles")
if not is_external:
# Clean up temporary files
for _tempfile in params.values():
_tempfile.close()
logger.info("Finished generating graph")
# Check if we even had a graph.
# This will be on the G attribute for any class subclassing NetworkX
if backend_instance.is_empty():
return {"message": f"Graph generation resulted in 0 nodes. "}, False
return {"graph": G, "backend": backend_instance}, True
def _add_to_exiting_graph(
existing_backend: Backend,
datasource_cls: Type[DataSource],
transformer_cls: Type[Transformer],
params: Dict[str, Any],
is_external: bool,
) -> Tuple[dict, bool]:
try:
# Set up parameters for datasource class
datasource_params = (
# Use filenames if we are referencing a temporary file
{param_name: tempfile.name for param_name, tempfile in params.items()}
if not is_external
else params
)
# Create the datasource
datasource = datasource_cls(**datasource_params) # type: ignore
# Create transformer
transformer = datasource.to_transformer(transformer_cls)
# Create the nodes
nodes = transformer.run()
# Create the backend
G = existing_backend.add_nodes(nodes)
except Exception as e:
logger.critical(f"Failure to generate graph {e}")
import traceback
logger.debug(f"{traceback.format_exc()}")
if not is_external:
# Clean up temporary files
try:
for _tempfile in params.values():
_tempfile.close()
except Exception as e:
logger.critical(f"Failure to clean up temporary files after error {e}")
return {"message": str(e)}, False
logger.info("Cleaning up tempfiles")
if not is_external:
# Clean up temporary files
for _tempfile in params.values():
_tempfile.close()
logger.info("Finished generating graph")
# Check if we even had a graph.
# This will be on the G attribute for any class subclassing NetworkX
if existing_backend.is_empty():
return {"message": f"Graph generation resulted in 0 nodes."}, False
return {"graph": G, "backend": existing_backend}, True
def _save_graph_to_db(backend: NetworkX, category: str, graph_id: int = None) -> dict:
"""Saves a graph to the database, optionally forcing an overwrite of an existing graph.
Parameters
----------
backend : NetworkX
The NetworkX object to save
category : str
The category
graph_id: int
The graph ID to override.
Returns
-------
dict
JSON to return to client with ID and path.
"""
# Take the SHA256 of the contents of the graph.
contents_hash = hashlib.sha256(
json.dumps(backend.to_json(), sort_keys=True).encode("utf-8")
).hexdigest()
# See if we have previously generated this *exact* graph.
existing = Graph.query.filter_by(meta=backend.metadata, sha256=contents_hash).first()
if existing:
logger.info(f"Graph previously generated with id {existing.id}")
return {"id": existing.id, "self": f"/{existing.category}/{existing.id}"}
dest_folder = category.replace(" ", "_").lower()
# Set up the storage directory.
dest_path = f"{Config.get('storage', 'dir')}/{dest_folder}/{contents_hash}.json"
os.makedirs(f"{Config.get('storage', 'dir')}/{dest_folder}", exist_ok=True)
json.dump(backend.to_json(), open(dest_path, "w"))
if graph_id:
db_entry = Graph.query.filter_by(id=graph_id).first()
# set the new hash.
db_entry.file_path = f"{contents_hash}.json"
db_entry.sha256 = contents_hash
# NOTE: Old path is not deleted.
else:
db_entry = Graph(
sha256=contents_hash,
meta=backend.metadata,
comment=request.form.get("comment", None),
category=dest_folder, # Categories use the lower name!
file_path=f"{contents_hash}.json",
)
# Add new entry
db.session.add(db_entry)
db.session.commit()
logger.info(f"Added graph to database with id={db_entry.id}")
logger.info(f"Saved graph to {dest_path}")
return {"id": db_entry.id, "self": f"/{dest_folder}/{db_entry.id}"}
[docs]@api.route("/adhoc", methods=["POST"])
def adhoc():
"""Allows for ad-hoc transformation of generic JSON Data based on one of two CIM models:
1. The Beagle CIM Model (defined in `constants.py`)
2. The OSSEM Model (defined in https://github.com/Cyb3rWard0g/OSSEM)
"""
valid_cim_formats = ["beagle"]
data = request.get_json()
events = data["data"]
cim_format = data.get("cim", "beagle")
if str(cim_format).lower() not in valid_cim_formats:
response = jsonify({"message": f"cim_format must be in {cim_format}"})
return response
if not isinstance(events, list):
events = [events]
logger.info(f"Beginning ad-hoc graphing request")
g = JSONData(events).to_graph(consolidate_edges=True)
logger.info(f"Completed ad-hoc graphing request")
return jsonify({"data": NetworkX.graph_to_json(g)})
[docs]@api.route("/categories/")
@api.route("/categories")
def get_categories():
"""Returns a list of categories as id, name pairs.
This list is made up of all categories specified in the category field for each
datasource.
>>> {
"id": "vt_sandbox",
"name": "VT Sandbox"
}
Returns
-------
List[dict]
"""
categories = set([source.category for source in DATASOURCES.values()])
resp = [{"id": category.replace(" ", "_").lower(), "name": category} for category in categories]
# Show only the responses we upload
if request.args.get("uploaded"):
categories = [value[0] for value in db.session.query(Graph.category).distinct()]
# Filter out the ones we don't have
resp = list(filter(lambda entry: entry["id"] in categories, resp))
response = jsonify(resp)
return response
[docs]@api.route("/categories/<string:category>")
def get_category_items(category: str): # pragma: no cover
"""Returns the set of items that exist in this category, the path to their JSON files, the comment
made on them, as well as their metadata.
>>> {
comment: str,
file_path: str,
id: int,
metadata: Dict[str, Any]
}
Returns 404 if the category is invalid.
Parameters
----------
category : str
The category to fetch data for.
Returns
-------
List[dict]
"""
if category not in set(
[source.category.replace(" ", "_").lower() for source in DATASOURCES.values()]
):
return make_response(jsonify({"message": "Category not found"}), 404)
# Return reversed.
category_data = [graph.to_json() for graph in Graph.query.filter_by(category=category).all()]
category_data = category_data[::-1]
response = jsonify(category_data)
return response
[docs]@api.route("/graph/<int:graph_id>")
def get_graph(graph_id: int): # pragma: no cover - hard to test due to building path.
"""Returns the JSON object for this graph. This is a networkx node_data JSON dump:
>>> {
directed: boolean,
links: [
{...}
],
multigraph: boolean,
nodes: [
{...}
]
}
Returns 404 if the graph is not found.
Parameters
----------
graph_id : int
The graph ID to fetch data for
Returns
-------
Dict
See https://networkx.github.io/documentation/stable/reference/readwrite/generated/networkx.readwrite.json_graph.node_link_graph.html
"""
graph_obj = Graph.query.filter_by(id=graph_id).first()
if not graph_obj:
return make_response(jsonify({"message": "Graph not found"}), 404)
dest_path = f"{Config.get('storage', 'dir')}/{graph_obj.category}/{graph_obj.file_path}"
json_data = json.load(open(dest_path, "r"))
response = jsonify(json_data)
return response