import hashlib
import inspect
import json
import os
import sys
import tempfile
from inspect import _empty # type: ignore
from flask import Blueprint, jsonify, request
from flask.helpers import make_response
import beagle.datasources # noqa: F401
import beagle.transformers # noqa: F401
from beagle.common import logger
from beagle.config import Config
from beagle.datasources.base_datasource import ExternalDataSource
from beagle.backends import NetworkX
from beagle.web.api.models import Graph
from beagle.web.server import db
api = Blueprint("api", __name__, url_prefix="/api")
# Define a mapping between datasource classes to strings
DATASOURCES = {
# Class name is used here.
cls[1].__name__: cls[1]
for cls in inspect.getmembers(
sys.modules["beagle.datasources"],
lambda cls: inspect.isclass(cls) and not inspect.isabstract(cls),
)
}
# Define a mapping between transformer class *names* to class objects
TRANSFORMERS = {
# Human-readable name used here.
cls[1].__name__: cls[1]
for cls in inspect.getmembers(
sys.modules["beagle.transformers"],
lambda cls: inspect.isclass(cls) and not inspect.isabstract(cls),
)
}
# Generate an array containing a description of each datasource.
# This includes it's name, it's id, it's required parameters, and the transformers
# which it can send data to.
SCHEMA = [
{
"id": datasource.__name__,
"name": datasource.name,
"params": [
{
"name": k,
"required": (v.default == _empty),
} # Check if there is a default value, if not, required.
for k, v in inspect.signature(
datasource
).parameters.items() # Gets the expected parameters
],
"type": "external" if issubclass(datasource, ExternalDataSource) else "files",
"transformers": [
{"id": trans.__name__, "name": trans.name} for trans in datasource.transformers
],
}
for datasource in DATASOURCES.values()
]
[docs]@api.route("/datasources")
def pipelines():
"""Returns a list of all available datasources, their parameters,
names, ids, and supported transformers.
A single entry in the array is formatted as follows:
>>> {
"id": str,
"name": str,
"params": [
{
"name": str,
"required": bool,
}
...
],
"transformers": [
{
"id": str,
"name": str
}
]
"type": "files" OR "external
}
If the 'type' field is set to 'files', it means that the parameters
represent required files, if it is set to 'external' this means that the
parameters represent string inputs.
The main purpose of this endpoint is to allow users to query beagle
in order to easily identify what datasource and transformer combinations
are possible, as well as what parameters are required.
Returns
-------
List[dict]
An array of datasource specifications.
"""
response = jsonify(SCHEMA)
response.headers.add("Access-Control-Allow-Origin", "*")
return response
[docs]@api.route("/new", methods=["POST"])
def new():
"""Generate a new graph using the supplied DataSource, Transformer, and the parameters
passed to the DataSource.
At minimum, the user must supply the following form parameters:
1. datasource
2. transformer
3. comment
Outside of that, the user must supply at **minimum** the parameters marked by
the datasource as required.
* Use the /api/datasources endpoint to see which ones these are.
* Programmatically, these are any parameters without a default value.
Failure to supply either the minimum three or the required parameters for that datasource
returns a 400 status code with the missing parameters in the 'message' field.
If any part of the graph creation yields an error, a 500 HTTP code is returend with the
python exception as a string in the 'message' field.
If the graph is succesfully created, the user is returned a dictionary with the ID of the graph
and the URI path to viewing it in the *beagle web interface*.
For example:
>>> {
id: 1,
self: /fireeye_hx/1
}
Returns
-------
dict
{id: integer, self: string}
"""
# Verify we have the basic parameters.
missing_params = []
for param in ["datasource", "transformer", "comment"]:
if param not in request.form:
missing_params.append(param)
if len(missing_params) > 0:
logger.debug(f"Request to /new missing parameters: {missing_params}")
return make_response(jsonify({"message": f"Missing parameters {missing_params}"}), 400)
# Get the
requested_datasource = request.form["datasource"]
requested_transformer = request.form["transformer"]
datasource_schema = next(
filter(lambda entry: entry["id"] == requested_datasource, SCHEMA), None
)
if datasource_schema is None:
logger.debug(f"User requested a non-existent data source {requested_datasource}")
return make_response(
jsonify(
{
"message": f"Requested datasource '{requested_datasource}' is invalid, "
+ "please use /api/datasources to find a list of valid datasources"
}
),
400,
)
logger.info(
f"Recieved upload request for datasource=<{requested_datasource}>, transformer=<{requested_transformer}>"
)
datasource_cls = DATASOURCES[requested_datasource]
transformer_cls = TRANSFORMERS[requested_transformer]
required_parameters = datasource_schema["params"]
# If this class extends the ExternalDataSource class, we know that the parameters
# represent strings, and not files.
is_external = issubclass(datasource_cls, ExternalDataSource)
# Make sure the user provided all required parameters for the datasource.
datasource_missing_params = []
for param in required_parameters:
# Skip missnig parameters
if param["required"] is False:
continue
if is_external and param["name"] not in request.form:
datasource_missing_params.append(param["name"])
if not is_external and param["name"] not in request.files:
datasource_missing_params.append(param["name"])
if len(datasource_missing_params) > 0:
logger.debug(
f"Missing datasource {'form' if is_external else 'files'} params {datasource_missing_params}"
)
return make_response(
jsonify(
{
"message": f"Missing datasource {'form' if is_external else 'files'} params {datasource_missing_params}"
}
),
400,
)
logger.info("Transforming data to a graph.")
try:
if is_external:
# External parameters are in the form
datasource_params = {}
for param in datasource_schema["params"]:
if param["name"] in request.form:
datasource_params[param["name"]] = request.form[param["name"]]
logger.debug(f"ExternalDataSource params received {datasource_params}")
# Generate the graph.
datasource = datasource_cls(**datasource_params)
transformer = datasource.to_transformer(transformer_cls)
graph = NetworkX(
metadata=datasource.metadata(), nodes=transformer.run(), consolidate_edges=True
)
else:
# Non-external is in the files, and gets saved to temporary files.
tempfiles = {}
for param in datasource_schema["params"]:
# Save the files, keep track of which parameter they represent
if param["name"] in request.files:
tempfiles[param["name"]] = tempfile.NamedTemporaryFile()
request.files[param["name"]].save(tempfiles[param["name"]].name)
tempfiles[param["name"]].seek(0)
logger.info(f"Saved uploaded files {tempfiles}")
# Use the temporary files as a
datasource = datasource_cls(
**{param_name: tempfile.name for param_name, tempfile in tempfiles.items()}
)
transformer = datasource.to_transformer(transformer_cls)
graph = NetworkX(
metadata=datasource.metadata(), nodes=transformer.run(), consolidate_edges=True
)
# Clean up temporary files
for _tempfile in tempfiles.values():
_tempfile.close()
# Make the graph
G = graph.graph()
except Exception as e:
logger.critical(f"Failure to generate graph {e}")
if not is_external:
# Clean up temporary files
try:
for _tempfile in tempfiles.values():
_tempfile.close()
except Exception as e:
logger.critical(f"Failure to clean up temporary files after error {e}")
return make_response(jsonify({"message": str(e)}), 500)
logger.info("Finished generating graph")
if len(G.nodes()) == 0:
return make_response(jsonify({"message": f"Graph generation resulted in 0 nodes. "}), 400)
# Take the SHA256 of the contents of the graph.
contents_hash = hashlib.sha256(
json.dumps(graph.to_json(), sort_keys=True).encode("utf-8")
).hexdigest()
# See if we have previously generated this *exact* graph.
existing = Graph.query.filter_by(meta=graph.metadata, sha256=contents_hash).first()
if existing:
logger.info(f"Graph previously generated with id {existing.id}")
response = jsonify({"id": existing.id, "self": f"/{existing.category}/{existing.id}"})
response.headers.add("Access-Control-Allow-Origin", "*")
return response
dest_folder = datasource_cls.category.replace(" ", "_").lower()
# Set up the storage directory.
dest_path = f"{Config.get('storage', 'dir')}/{dest_folder}/{contents_hash}.json"
os.makedirs(f"{Config.get('storage', 'dir')}/{dest_folder}", exist_ok=True)
db_entry = Graph(
sha256=contents_hash,
meta=graph.metadata,
comment=request.form.get("comment", None),
category=dest_folder, # Categories use the lower name!
file_path=f"{contents_hash}.json",
)
db.session.add(db_entry)
db.session.commit()
logger.info(f"Added graph to database with id={db_entry.id}")
json.dump(graph.to_json(), open(dest_path, "w"))
logger.info(f"Saved graph to {dest_path}")
response = jsonify({"id": db_entry.id, "self": f"/{dest_folder}/{db_entry.id}"})
response.headers.add("Access-Control-Allow-Origin", "*")
return response
[docs]@api.route("/categories/")
def get_categories():
"""Returns a list of categories as id, name pairs.
This list is made up of all categories specified in the category field for each
datasource.
>>> {
"id": "vt_sandbox",
"name": "VT Sandbox"
}
Returns
-------
List[dict]
"""
categories = set([source.category for source in DATASOURCES.values()])
response = jsonify(
[{"id": category.replace(" ", "_").lower(), "name": category} for category in categories]
)
response.headers.add("Access-Control-Allow-Origin", "*")
return response
[docs]@api.route("/categories/<string:category>")
def get_category_items(category: str):
"""Returns the set of items that exist in this category, the path to their JSON files, the comment
made on them, as well as their metadata.
>>> {
comment: str,
file_path: str,
id: int,
metadata: Dict[str, Any]
}
Returns 404 if the category is invalid.
Parameters
----------
category : str
The category to fetch data for.
Returns
-------
List[dict]
"""
if category not in set(
[source.category.replace(" ", "_").lower() for source in DATASOURCES.values()]
):
return make_response(jsonify({"message": "Category not found"}), 404)
# Return reversed.
category_data = [graph.to_json() for graph in Graph.query.filter_by(category=category).all()][
::-1
]
response = jsonify(category_data)
response.headers.add("Access-Control-Allow-Origin", "*")
return response
[docs]@api.route("/graph/<int:graph_id>")
def get_graph(graph_id: int):
"""Returns the JSON object for this graph. This is a networkx node_data JSON dump:
>>> {
directed: boolean,
links: [
{...}
],
multigraph: boolean,
nodes: [
{...}
]
}
Returns 404 if the graph is not found.
Parameters
----------
graph_id : int
The graph ID to fetch data for
Returns
-------
Dict
See https://networkx.github.io/documentation/stable/reference/readwrite/generated/networkx.readwrite.json_graph.node_link_graph.html
"""
graph_obj = Graph.query.filter_by(id=graph_id).first()
if not graph_obj:
return make_response(jsonify({"message": "Graph not found"}), 404)
dest_path = f"{Config.get('storage', 'dir')}/{graph_obj.category}/{graph_obj.file_path}"
json_data = json.load(open(dest_path, "r"))
response = jsonify(json_data)
response.headers.add("Access-Control-Allow-Origin", "*")
return response