import csv
import html
import importlib
import json
import os
import uuid
from datetime import datetime
from pathlib import Path
import texttable
from nettacker import logger, all_module_severity_and_desc
from nettacker.config import Config, version_info
from nettacker.core.die import die_failure
from nettacker.core.messages import messages as _
from nettacker.core.utils.common import (
merge_logs_to_list,
now,
sanitize_path,
generate_compare_filepath,
)
from nettacker.database.db import get_logs_by_scan_id, submit_report_to_db, get_options_by_scan_id
log = logger.get_logger()
nettacker_path_config = Config.path
def build_graph(graph_name, events):
"""
build a graph
Args:
graph_name: graph name
events: list of events
Returns:
graph in HTML type
"""
log.info(_("build_graph"))
try:
start = getattr(
importlib.import_module(
f"nettacker.lib.graph.{graph_name.rsplit('_graph')[0]}.engine"
),
"start",
)
except ModuleNotFoundError:
die_failure(_("graph_module_unavailable").format(graph_name))
log.info(_("finish_build_graph"))
return start(events)
def build_compare_report(compare_results):
"""
build the compare report
Args:
compare_results: Final result of the comparision(dict)
Returns:
report in html format
"""
log.info(_("build_compare_report"))
try:
build_report = getattr(
importlib.import_module("nettacker.lib.compare_report.engine"),
"build_report",
)
except ModuleNotFoundError:
die_failure(_("graph_module_unavailable").format("compare_report"))
log.info(_("finish_build_report"))
return build_report(compare_results)
def build_text_table(events):
"""
value['date'], value["target"], value['module_name'], value['scan_id'],
value['options'], value['event']
build a text table with generated event related to the scan
:param events: all events
:return:
array [text table, event_number]
"""
_table = texttable.Texttable()
table_headers = ["date", "target", "module_name", "port", "logs"]
_table.add_rows([table_headers])
for event in events:
log = merge_logs_to_list(json.loads(event["json_event"]), [])
_table.add_rows(
[
table_headers,
[
event["date"],
event["target"],
event["module_name"],
str(event["port"]),
"\n".join(log) if log else "Detected",
],
]
)
return (
_table.draw()
+ "\n\n"
+ _("nettacker_version_details").format(version_info()[0], version_info()[1], now())
+ "\n"
)
def create_compare_text_table(results):
table = texttable.Texttable()
table_headers = list(results.keys())
table.add_rows([table_headers])
table.add_rows(
[
table_headers,
[results[col] for col in table_headers],
]
)
table.set_cols_width([len(i) for i in table_headers])
return table.draw() + "\n\n"
def create_dd_specific_json(all_scan_logs):
severity_mapping = {1: "Info", 2: "Low", 3: "Medium", 4: "High", 5: "Critical"}
findings = []
for log in all_scan_logs:
module_name = log["module_name"].strip()
date = datetime.strptime(log["date"], "%Y-%m-%d %H:%M:%S.%f").strftime("%m/%d/%Y")
port = str(log.get("port", "")).strip()
impact = log.get("event", "").strip()
severity_justification = log.get("json_event", "").strip()
service = log.get("target", "").strip()
unique_id = log.get("scan_id", uuid.uuid4().hex)
metadata = all_module_severity_and_desc.get(module_name, {})
severity_raw = metadata.get("severity", 0)
description = metadata.get("desc", "")
if severity_raw >= 9:
severity = severity_mapping[5]
elif severity_raw >= 7:
severity = severity_mapping[4]
elif severity_raw >= 4:
severity = severity_mapping[3]
elif severity_raw > 0:
severity = severity_mapping[2]
else:
severity = severity_mapping[1]
findings.append(
{
"date": date,
"title": module_name,
"description": description.strip(),
"severity": severity,
"param": port,
"impact": impact,
"severity_justification": severity_justification,
"service": service,
"unique_id_from_tool": unique_id,
"static_finding": False,
"dynamic_finding": True,
}
)
return json.dumps({"findings": findings}, indent=4)
def create_sarif_report(all_scan_logs):
"""
Takes all_scan_logs and converts them to a SARIF based json
format. The schema and version used are 2.1.0 linked below.
The following conversions are made:
ruleId: name of the module
message: event value for each log in all_scan_logs
locations.physicalLocations.artifactLocation.uri: target value
webRequest.properties.json_event: json_event value for each log in all_scan_logs
properties.scan_id: scan_id unique value for each run
properties.date: date field specified in all_scan_logs
"""
sarif_structure = {
"$schema": "https://json.schemastore.org/sarif-2.1.0.json",
"version": "2.1.0",
"runs": [
{
"tool": {
"driver": {
"name": "Nettacker",
"version": "0.4.0",
"informationUri": "https://github.com/OWASP/Nettacker",
}
},
"results": [],
}
],
}
for log in all_scan_logs:
sarif_result = {
"ruleId": log["module_name"],
"message": {"text": log["event"]},
"locations": [{"physicalLocation": {"artifactLocation": {"uri": log["target"]}}}],
"properties": {
"scan_id": log["scan_id"],
"date": log["date"],
"json_event": log["json_event"],
},
}
sarif_structure["runs"][0]["results"].append(sarif_result)
return json.dumps(sarif_structure, indent=2)
def create_report(options, scan_id):
"""
sort all events, create log file in HTML/TEXT/JSON and remove old logs
Args:
options: parsing options
scan_id: scan unique id
Returns:
True if success otherwise None
"""
all_scan_logs = get_logs_by_scan_id(scan_id)
if not all_scan_logs:
log.info(_("no_events_for_report"))
return True
report_path_filename = options.report_path_filename
if (len(report_path_filename) >= 5 and report_path_filename[-5:] == ".html") or (
len(report_path_filename) >= 4 and report_path_filename[-4:] == ".htm"
):
if options.graph_name:
html_graph = build_graph(options.graph_name, all_scan_logs)
else:
html_graph = ""
from nettacker.lib.html_log import log_data
html_table_content = log_data.table_title.format(
html_graph,
log_data.css_1,
"date",
"target",
"module_name",
"port",
"logs",
"json_event",
)
index = 1
for event in all_scan_logs:
log_list = merge_logs_to_list(json.loads(event["json_event"]), [])
html_table_content += log_data.table_items.format(
event["date"],
event["target"],
event["module_name"],
event["port"],
"
".join(log_list) if log_list else "Detected", # event["event"], #log
index,
html.escape(event["json_event"]),
)
index += 1
html_table_content += (
log_data.table_end
+ '