import csv import html import importlib import json import os import uuid from datetime import datetime from pathlib import Path import texttable from nettacker import logger, all_module_severity_and_desc from nettacker.config import Config, version_info from nettacker.core.die import die_failure from nettacker.core.messages import messages as _ from nettacker.core.utils.common import ( merge_logs_to_list, now, sanitize_path, generate_compare_filepath, ) from nettacker.database.db import get_logs_by_scan_id, submit_report_to_db, get_options_by_scan_id log = logger.get_logger() nettacker_path_config = Config.path def build_graph(graph_name, events): """ build a graph Args: graph_name: graph name events: list of events Returns: graph in HTML type """ log.info(_("build_graph")) try: start = getattr( importlib.import_module( f"nettacker.lib.graph.{graph_name.rsplit('_graph')[0]}.engine" ), "start", ) except ModuleNotFoundError: die_failure(_("graph_module_unavailable").format(graph_name)) log.info(_("finish_build_graph")) return start(events) def build_compare_report(compare_results): """ build the compare report Args: compare_results: Final result of the comparision(dict) Returns: report in html format """ log.info(_("build_compare_report")) try: build_report = getattr( importlib.import_module("nettacker.lib.compare_report.engine"), "build_report", ) except ModuleNotFoundError: die_failure(_("graph_module_unavailable").format("compare_report")) log.info(_("finish_build_report")) return build_report(compare_results) def build_text_table(events): """ value['date'], value["target"], value['module_name'], value['scan_id'], value['options'], value['event'] build a text table with generated event related to the scan :param events: all events :return: array [text table, event_number] """ _table = texttable.Texttable() table_headers = ["date", "target", "module_name", "port", "logs"] _table.add_rows([table_headers]) for event in events: log = merge_logs_to_list(json.loads(event["json_event"]), []) _table.add_rows( [ table_headers, [ event["date"], event["target"], event["module_name"], str(event["port"]), "\n".join(log) if log else "Detected", ], ] ) return ( _table.draw() + "\n\n" + _("nettacker_version_details").format(version_info()[0], version_info()[1], now()) + "\n" ) def create_compare_text_table(results): table = texttable.Texttable() table_headers = list(results.keys()) table.add_rows([table_headers]) table.add_rows( [ table_headers, [results[col] for col in table_headers], ] ) table.set_cols_width([len(i) for i in table_headers]) return table.draw() + "\n\n" def create_dd_specific_json(all_scan_logs): severity_mapping = {1: "Info", 2: "Low", 3: "Medium", 4: "High", 5: "Critical"} findings = [] for log in all_scan_logs: module_name = log["module_name"].strip() date = datetime.strptime(log["date"], "%Y-%m-%d %H:%M:%S.%f").strftime("%m/%d/%Y") port = str(log.get("port", "")).strip() impact = log.get("event", "").strip() severity_justification = log.get("json_event", "").strip() service = log.get("target", "").strip() unique_id = log.get("scan_id", uuid.uuid4().hex) metadata = all_module_severity_and_desc.get(module_name, {}) severity_raw = metadata.get("severity", 0) description = metadata.get("desc", "") if severity_raw >= 9: severity = severity_mapping[5] elif severity_raw >= 7: severity = severity_mapping[4] elif severity_raw >= 4: severity = severity_mapping[3] elif severity_raw > 0: severity = severity_mapping[2] else: severity = severity_mapping[1] findings.append( { "date": date, "title": module_name, "description": description.strip(), "severity": severity, "param": port, "impact": impact, "severity_justification": severity_justification, "service": service, "unique_id_from_tool": unique_id, "static_finding": False, "dynamic_finding": True, } ) return json.dumps({"findings": findings}, indent=4) def create_sarif_report(all_scan_logs): """ Takes all_scan_logs and converts them to a SARIF based json format. The schema and version used are 2.1.0 linked below. The following conversions are made: ruleId: name of the module message: event value for each log in all_scan_logs locations.physicalLocations.artifactLocation.uri: target value webRequest.properties.json_event: json_event value for each log in all_scan_logs properties.scan_id: scan_id unique value for each run properties.date: date field specified in all_scan_logs """ sarif_structure = { "$schema": "https://json.schemastore.org/sarif-2.1.0.json", "version": "2.1.0", "runs": [ { "tool": { "driver": { "name": "Nettacker", "version": "0.4.0", "informationUri": "https://github.com/OWASP/Nettacker", } }, "results": [], } ], } for log in all_scan_logs: sarif_result = { "ruleId": log["module_name"], "message": {"text": log["event"]}, "locations": [{"physicalLocation": {"artifactLocation": {"uri": log["target"]}}}], "properties": { "scan_id": log["scan_id"], "date": log["date"], "json_event": log["json_event"], }, } sarif_structure["runs"][0]["results"].append(sarif_result) return json.dumps(sarif_structure, indent=2) def create_report(options, scan_id): """ sort all events, create log file in HTML/TEXT/JSON and remove old logs Args: options: parsing options scan_id: scan unique id Returns: True if success otherwise None """ all_scan_logs = get_logs_by_scan_id(scan_id) if not all_scan_logs: log.info(_("no_events_for_report")) return True report_path_filename = options.report_path_filename if (len(report_path_filename) >= 5 and report_path_filename[-5:] == ".html") or ( len(report_path_filename) >= 4 and report_path_filename[-4:] == ".htm" ): if options.graph_name: html_graph = build_graph(options.graph_name, all_scan_logs) else: html_graph = "" from nettacker.lib.html_log import log_data html_table_content = log_data.table_title.format( html_graph, log_data.css_1, "date", "target", "module_name", "port", "logs", "json_event", ) index = 1 for event in all_scan_logs: log_list = merge_logs_to_list(json.loads(event["json_event"]), []) html_table_content += log_data.table_items.format( event["date"], event["target"], event["module_name"], event["port"], "
".join(log_list) if log_list else "Detected", # event["event"], #log index, html.escape(event["json_event"]), ) index += 1 html_table_content += ( log_data.table_end + '
' + str(index - 1) + "
" + '" + log_data.json_parse_js ) with Path(report_path_filename).open("w", encoding="utf-8") as report_file: report_file.write(html_table_content + "\n") elif len(report_path_filename) >= 5 and report_path_filename[-8:].lower() == ".dd.json": with Path(report_path_filename).open("w", encoding="utf-8") as report_file: dd_content_json = create_dd_specific_json(all_scan_logs) report_file.write(dd_content_json + "\n") elif len(report_path_filename) >= 5 and report_path_filename[-5:] == ".json": with Path(report_path_filename).open("w", encoding="utf-8") as report_file: report_file.write(str(json.dumps(all_scan_logs)) + "\n") elif len(report_path_filename) >= 6 and report_path_filename[-6:].lower() == ".sarif": with Path(report_path_filename).open("w", encoding="utf-8") as report_file: sarif_content = create_sarif_report(all_scan_logs) report_file.write(sarif_content + "\n") elif len(report_path_filename) >= 5 and report_path_filename[-4:] == ".csv": keys = all_scan_logs[0].keys() with Path(report_path_filename).open("a") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=keys) writer.writeheader() for log_list in all_scan_logs: dict_data = {key: value for key, value in log_list.items() if key in keys} writer.writerow(dict_data) else: with Path(report_path_filename).open("w", encoding="utf-8") as report_file: report_file.write(build_text_table(all_scan_logs)) log.write(build_text_table(all_scan_logs)) submit_report_to_db( { "date": datetime.now(), "scan_id": scan_id, "options": vars(options), } ) log.info(_("file_saved").format(report_path_filename)) return True def create_compare_report(options, scan_id): """ if compare_id is given then create the report of comparision b/w scans Args: options: parsing options scan_id: scan unique id Returns: True if success otherwise None """ comp_id = options["scan_compare_id"] if isinstance(options, dict) else options.scan_compare_id scan_log_curr = get_logs_by_scan_id(scan_id) scan_logs_comp = get_logs_by_scan_id(comp_id) if not scan_log_curr: log.info(_("no_events_for_report")) return None if not scan_logs_comp: log.info(_("no_scan_to_compare")) return None scan_opts_curr = get_options_by_scan_id(scan_id) scan_opts_comp = get_options_by_scan_id(comp_id) def get_targets_set(item): return tuple(json.loads(item["options"])["targets"]) curr_target_set = set(get_targets_set(item) for item in scan_opts_curr) comp_target_set = set(get_targets_set(item) for item in scan_opts_comp) def get_modules_ports(item): return (item["target"], item["module_name"], item["port"]) curr_modules_ports = set(get_modules_ports(item) for item in scan_log_curr) comp_modules_ports = set(get_modules_ports(item) for item in scan_logs_comp) compare_results = { "curr_scan_details": (scan_id, scan_log_curr[0]["date"]), "comp_scan_details": (comp_id, scan_logs_comp[0]["date"]), "curr_target_set": tuple(curr_target_set), "comp_target_set": tuple(comp_target_set), "curr_scan_result": tuple(curr_modules_ports), "comp_scan_result": tuple(comp_modules_ports), "new_targets_discovered": tuple(curr_modules_ports - comp_modules_ports), "old_targets_not_detected": tuple(comp_modules_ports - curr_modules_ports), } if isinstance(options, dict): compare_report_path_filename = options["compare_report_path_filename"] else: compare_report_path_filename = ( options.compare_report_path_filename if len(options.compare_report_path_filename) != 0 else generate_compare_filepath(scan_id) ) base_path = str(nettacker_path_config.results_dir) compare_report_path_filename = sanitize_path(compare_report_path_filename) fullpath = os.path.normpath(os.path.join(base_path, compare_report_path_filename)) if not fullpath.startswith(base_path): raise PermissionError if (len(fullpath) >= 5 and fullpath[-5:] == ".html") or ( len(fullpath) >= 4 and fullpath[-4:] == ".htm" ): html_report = build_compare_report(compare_results) with Path(fullpath).open("w", encoding="utf-8") as compare_report: compare_report.write(html_report + "\n") elif len(fullpath) >= 5 and fullpath[-5:] == ".json": with Path(fullpath).open("w", encoding="utf-8") as compare_report: compare_report.write(str(json.dumps(compare_results)) + "\n") elif len(fullpath) >= 5 and fullpath[-4:] == ".csv": keys = compare_results.keys() with Path(fullpath).open("a") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=keys) if csvfile.tell() == 0: writer.writeheader() writer.writerow(compare_results) else: with Path(fullpath).open("w", encoding="utf-8") as compare_report: compare_report.write(create_compare_text_table(compare_results)) log.write(create_compare_text_table(compare_results)) log.info(_("compare_report_saved").format(fullpath)) return True