Nettacker/nettacker/core/graph.py

404 lines
14 KiB
Python

import csv
import html
import importlib
import json
import os
import uuid
from datetime import datetime
from pathlib import Path
import texttable
from nettacker import logger, all_module_severity_and_desc
from nettacker.config import Config, version_info
from nettacker.core.die import die_failure
from nettacker.core.messages import messages as _
from nettacker.core.utils.common import (
merge_logs_to_list,
now,
sanitize_path,
generate_compare_filepath,
)
from nettacker.database.db import get_logs_by_scan_id, submit_report_to_db, get_options_by_scan_id
log = logger.get_logger()
nettacker_path_config = Config.path
def build_graph(graph_name, events):
"""
build a graph
Args:
graph_name: graph name
events: list of events
Returns:
graph in HTML type
"""
log.info(_("build_graph"))
try:
start = getattr(
importlib.import_module(
f"nettacker.lib.graph.{graph_name.rsplit('_graph')[0]}.engine"
),
"start",
)
except ModuleNotFoundError:
die_failure(_("graph_module_unavailable").format(graph_name))
log.info(_("finish_build_graph"))
return start(events)
def build_compare_report(compare_results):
"""
build the compare report
Args:
compare_results: Final result of the comparision(dict)
Returns:
report in html format
"""
log.info(_("build_compare_report"))
try:
build_report = getattr(
importlib.import_module("nettacker.lib.compare_report.engine"),
"build_report",
)
except ModuleNotFoundError:
die_failure(_("graph_module_unavailable").format("compare_report"))
log.info(_("finish_build_report"))
return build_report(compare_results)
def build_text_table(events):
"""
value['date'], value["target"], value['module_name'], value['scan_id'],
value['options'], value['event']
build a text table with generated event related to the scan
:param events: all events
:return:
array [text table, event_number]
"""
_table = texttable.Texttable()
table_headers = ["date", "target", "module_name", "port", "logs"]
_table.add_rows([table_headers])
for event in events:
log = merge_logs_to_list(json.loads(event["json_event"]), [])
_table.add_rows(
[
table_headers,
[
event["date"],
event["target"],
event["module_name"],
str(event["port"]),
"\n".join(log) if log else "Detected",
],
]
)
return (
_table.draw()
+ "\n\n"
+ _("nettacker_version_details").format(version_info()[0], version_info()[1], now())
+ "\n"
)
def create_compare_text_table(results):
table = texttable.Texttable()
table_headers = list(results.keys())
table.add_rows([table_headers])
table.add_rows(
[
table_headers,
[results[col] for col in table_headers],
]
)
table.set_cols_width([len(i) for i in table_headers])
return table.draw() + "\n\n"
def create_dd_specific_json(all_scan_logs):
severity_mapping = {1: "Info", 2: "Low", 3: "Medium", 4: "High", 5: "Critical"}
findings = []
for log in all_scan_logs:
module_name = log["module_name"].strip()
date = datetime.strptime(log["date"], "%Y-%m-%d %H:%M:%S.%f").strftime("%m/%d/%Y")
port = str(log.get("port", "")).strip()
impact = log.get("event", "").strip()
severity_justification = log.get("json_event", "").strip()
service = log.get("target", "").strip()
unique_id = log.get("scan_id", uuid.uuid4().hex)
metadata = all_module_severity_and_desc.get(module_name, {})
severity_raw = metadata.get("severity", 0)
description = metadata.get("desc", "")
if severity_raw >= 9:
severity = severity_mapping[5]
elif severity_raw >= 7:
severity = severity_mapping[4]
elif severity_raw >= 4:
severity = severity_mapping[3]
elif severity_raw > 0:
severity = severity_mapping[2]
else:
severity = severity_mapping[1]
findings.append(
{
"date": date,
"title": module_name,
"description": description.strip(),
"severity": severity,
"param": port,
"impact": impact,
"severity_justification": severity_justification,
"service": service,
"unique_id_from_tool": unique_id,
"static_finding": False,
"dynamic_finding": True,
}
)
return json.dumps({"findings": findings}, indent=4)
def create_sarif_report(all_scan_logs):
"""
Takes all_scan_logs and converts them to a SARIF based json
format. The schema and version used are 2.1.0 linked below.
The following conversions are made:
ruleId: name of the module
message: event value for each log in all_scan_logs
locations.physicalLocations.artifactLocation.uri: target value
webRequest.properties.json_event: json_event value for each log in all_scan_logs
properties.scan_id: scan_id unique value for each run
properties.date: date field specified in all_scan_logs
"""
sarif_structure = {
"$schema": "https://json.schemastore.org/sarif-2.1.0.json",
"version": "2.1.0",
"runs": [
{
"tool": {
"driver": {
"name": "Nettacker",
"version": "0.4.0",
"informationUri": "https://github.com/OWASP/Nettacker",
}
},
"results": [],
}
],
}
for log in all_scan_logs:
sarif_result = {
"ruleId": log["module_name"],
"message": {"text": log["event"]},
"locations": [{"physicalLocation": {"artifactLocation": {"uri": log["target"]}}}],
"properties": {
"scan_id": log["scan_id"],
"date": log["date"],
"json_event": log["json_event"],
},
}
sarif_structure["runs"][0]["results"].append(sarif_result)
return json.dumps(sarif_structure, indent=2)
def create_report(options, scan_id):
"""
sort all events, create log file in HTML/TEXT/JSON and remove old logs
Args:
options: parsing options
scan_id: scan unique id
Returns:
True if success otherwise None
"""
all_scan_logs = get_logs_by_scan_id(scan_id)
if not all_scan_logs:
log.info(_("no_events_for_report"))
return True
report_path_filename = options.report_path_filename
if (len(report_path_filename) >= 5 and report_path_filename[-5:] == ".html") or (
len(report_path_filename) >= 4 and report_path_filename[-4:] == ".htm"
):
if options.graph_name:
html_graph = build_graph(options.graph_name, all_scan_logs)
else:
html_graph = ""
from nettacker.lib.html_log import log_data
html_table_content = log_data.table_title.format(
html_graph,
log_data.css_1,
"date",
"target",
"module_name",
"port",
"logs",
"json_event",
)
index = 1
for event in all_scan_logs:
log_list = merge_logs_to_list(json.loads(event["json_event"]), [])
html_table_content += log_data.table_items.format(
event["date"],
event["target"],
event["module_name"],
event["port"],
"<br>".join(log_list) if log_list else "Detected", # event["event"], #log
index,
html.escape(event["json_event"]),
)
index += 1
html_table_content += (
log_data.table_end
+ '<div id="json_length">'
+ str(index - 1)
+ "</div>"
+ '<p class="footer">'
+ _("nettacker_version_details").format(version_info()[0], version_info()[1], now())
+ " ScanID: {0}".format(scan_id)
+ "</p>"
+ log_data.json_parse_js
)
with Path(report_path_filename).open("w", encoding="utf-8") as report_file:
report_file.write(html_table_content + "\n")
elif len(report_path_filename) >= 5 and report_path_filename[-8:].lower() == ".dd.json":
with Path(report_path_filename).open("w", encoding="utf-8") as report_file:
dd_content_json = create_dd_specific_json(all_scan_logs)
report_file.write(dd_content_json + "\n")
elif len(report_path_filename) >= 5 and report_path_filename[-5:] == ".json":
with Path(report_path_filename).open("w", encoding="utf-8") as report_file:
report_file.write(str(json.dumps(all_scan_logs)) + "\n")
elif len(report_path_filename) >= 6 and report_path_filename[-6:].lower() == ".sarif":
with Path(report_path_filename).open("w", encoding="utf-8") as report_file:
sarif_content = create_sarif_report(all_scan_logs)
report_file.write(sarif_content + "\n")
elif len(report_path_filename) >= 5 and report_path_filename[-4:] == ".csv":
keys = all_scan_logs[0].keys()
with Path(report_path_filename).open("a") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=keys)
writer.writeheader()
for log_list in all_scan_logs:
dict_data = {key: value for key, value in log_list.items() if key in keys}
writer.writerow(dict_data)
else:
with Path(report_path_filename).open("w", encoding="utf-8") as report_file:
report_file.write(build_text_table(all_scan_logs))
log.write(build_text_table(all_scan_logs))
submit_report_to_db(
{
"date": datetime.now(),
"scan_id": scan_id,
"options": vars(options),
}
)
log.info(_("file_saved").format(report_path_filename))
return True
def create_compare_report(options, scan_id):
"""
if compare_id is given then create the report of comparision b/w scans
Args:
options: parsing options
scan_id: scan unique id
Returns:
True if success otherwise None
"""
comp_id = options["scan_compare_id"] if isinstance(options, dict) else options.scan_compare_id
scan_log_curr = get_logs_by_scan_id(scan_id)
scan_logs_comp = get_logs_by_scan_id(comp_id)
if not scan_log_curr:
log.info(_("no_events_for_report"))
return None
if not scan_logs_comp:
log.info(_("no_scan_to_compare"))
return None
scan_opts_curr = get_options_by_scan_id(scan_id)
scan_opts_comp = get_options_by_scan_id(comp_id)
def get_targets_set(item):
return tuple(json.loads(item["options"])["targets"])
curr_target_set = set(get_targets_set(item) for item in scan_opts_curr)
comp_target_set = set(get_targets_set(item) for item in scan_opts_comp)
def get_modules_ports(item):
return (item["target"], item["module_name"], item["port"])
curr_modules_ports = set(get_modules_ports(item) for item in scan_log_curr)
comp_modules_ports = set(get_modules_ports(item) for item in scan_logs_comp)
compare_results = {
"curr_scan_details": (scan_id, scan_log_curr[0]["date"]),
"comp_scan_details": (comp_id, scan_logs_comp[0]["date"]),
"curr_target_set": tuple(curr_target_set),
"comp_target_set": tuple(comp_target_set),
"curr_scan_result": tuple(curr_modules_ports),
"comp_scan_result": tuple(comp_modules_ports),
"new_targets_discovered": tuple(curr_modules_ports - comp_modules_ports),
"old_targets_not_detected": tuple(comp_modules_ports - curr_modules_ports),
}
if isinstance(options, dict):
compare_report_path_filename = options["compare_report_path_filename"]
else:
compare_report_path_filename = (
options.compare_report_path_filename
if len(options.compare_report_path_filename) != 0
else generate_compare_filepath(scan_id)
)
base_path = str(nettacker_path_config.results_dir)
compare_report_path_filename = sanitize_path(compare_report_path_filename)
fullpath = os.path.normpath(os.path.join(base_path, compare_report_path_filename))
if not fullpath.startswith(base_path):
raise PermissionError
if (len(fullpath) >= 5 and fullpath[-5:] == ".html") or (
len(fullpath) >= 4 and fullpath[-4:] == ".htm"
):
html_report = build_compare_report(compare_results)
with Path(fullpath).open("w", encoding="utf-8") as compare_report:
compare_report.write(html_report + "\n")
elif len(fullpath) >= 5 and fullpath[-5:] == ".json":
with Path(fullpath).open("w", encoding="utf-8") as compare_report:
compare_report.write(str(json.dumps(compare_results)) + "\n")
elif len(fullpath) >= 5 and fullpath[-4:] == ".csv":
keys = compare_results.keys()
with Path(fullpath).open("a") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=keys)
if csvfile.tell() == 0:
writer.writeheader()
writer.writerow(compare_results)
else:
with Path(fullpath).open("w", encoding="utf-8") as compare_report:
compare_report.write(create_compare_text_table(compare_results))
log.write(create_compare_text_table(compare_results))
log.info(_("compare_report_saved").format(fullpath))
return True