adding new output types (#1085)

* sarif fully done, dd.json little left

* This is good to go now

* pre-commit fixes

* updated

* removing redundancy and less i/o operations

* ruff fixes

* fixed tests for Path.open

* rabbit suggestions

* added relevant documentation

* slight change in doc

* removing empty files that were added by mistake

* updated datatime format according to coderabbit's suggestions
This commit is contained in:
Achintya Jai 2025-09-07 23:52:29 +05:30 committed by GitHub
parent 2fea1e44f0
commit e934f748ee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 149 additions and 21 deletions

3
.gitignore vendored
View File

@ -20,6 +20,9 @@ logs.txt
results.*
.owasp-nettacker*
.nettacker/data*
.data*
*.sarif
*.dd.json
*.DS_Store
*.swp

View File

@ -55,7 +55,7 @@ Engine:
--verbose-event enable verbose event to see state of each thread
-V, --version show software version
-o REPORT_PATH_FILENAME, --output REPORT_PATH_FILENAME
save all logs in file (results.txt, results.csv, results.html, results.json)
save all logs in file (results.txt, results.csv, results.html, results.json, results.sarif, results.dd.json)
--graph GRAPH_NAME build a graph of all activities and information, you must use HTML output. available graphs:
['d3_tree_v2_graph', 'd3_tree_v1_graph']
-h, --help Show Nettacker Help Menu
@ -203,7 +203,7 @@ usage: Nettacker [-L LANGUAGE] [-v] [--verbose-event] [-V] [-o REPORT_PATH_FILEN
--verbose-event enable verbose event to see state of each thread
-V, --version نمایش ورژن نرم افزار
-o REPORT_PATH_FILENAME, --output REPORT_PATH_FILENAME
ذخیره کردن کل لاگ ها در فایل (result.txt، result.html، results.json)
ذخیره کردن کل لاگ ها در فایل (results.txt، results.html، results.csv, results.json, results.sarif, results.dd.json)
--graph GRAPH_NAME ساخت گراف از همه فعالیت ها و اطلاعات، شما باید از خروجی HTML استفاده کنید. گراف های در دسترس:
['d3_tree_v1_graph', 'd3_tree_v2_graph']
-h, --help نشان دادن منوی کمک Nettacker
@ -529,6 +529,22 @@ def nettacker_user_application_config():
}
```
* Nettacker supports five different output types for the final report
1. HTML (.html) -> This also renders the graph
2. CSV (.csv)
3. JSON (.json)
4. SARIF (.sarif)
5. DefectDojo compatible json (.dd.json)
These output types will help with integration with different softwares and dashboards. To set the output mode use the `-o` or `--output` flag
```
python nettacker.py -i 192.168.1.1/24 --profile information_gathering -o report.sarif
python nettacker.py -i 192.168.1.1/24 --profile information_gathering -o report.json
python nettacker.py -i 192.168.1.1/24 --profile information_gathering --output report.dd.json
```
# API and WebUI
API and WebUI are new interfaces through which you can send your commands to Nettacker. Technically WebUI was developed based on the present API to demonstrate an example of the current API and can be used as another easier interface. To start using this feature, simply run `python nettacker.py --start-api`.
```

View File

@ -0,0 +1 @@
all_module_severity_and_desc = {}

View File

@ -4,6 +4,7 @@ from argparse import ArgumentParser
import yaml
from nettacker import all_module_severity_and_desc
from nettacker.config import version_info, Config
from nettacker.core.die import die_failure, die_success
from nettacker.core.ip import (
@ -80,7 +81,6 @@ class ArgParser(ArgumentParser):
an array of all module names
"""
# Search for Modules
module_names = {}
for module_name in sorted(Config.path.modules_dir.glob("**/*.yaml")):
library = str(module_name).split("/")[-1].split(".")[0]
@ -88,7 +88,11 @@ class ArgParser(ArgumentParser):
module = f"{library}_{category}"
contents = yaml.safe_load(TemplateLoader(module).open().split("payload:")[0])
module_names[module] = contents["info"] if full_details else None
info = contents.get("info", {})
all_module_severity_and_desc[module] = {
"severity": info.get("severity", 0),
"desc": info.get("description", ""),
}
if len(module_names) == limit:
module_names["..."] = {}
break

View File

@ -3,11 +3,13 @@ import html
import importlib
import json
import os
import uuid
from datetime import datetime
from pathlib import Path
import texttable
from nettacker import logger
from nettacker import logger, all_module_severity_and_desc
from nettacker.config import Config, version_info
from nettacker.core.die import die_failure
from nettacker.core.messages import messages as _
@ -119,6 +121,99 @@ def create_compare_text_table(results):
return table.draw() + "\n\n"
def create_dd_specific_json(all_scan_logs):
severity_mapping = {1: "Info", 2: "Low", 3: "Medium", 4: "High", 5: "Critical"}
findings = []
for log in all_scan_logs:
module_name = log["module_name"].strip()
date = datetime.strptime(log["date"], "%Y-%m-%d %H:%M:%S.%f").strftime("%m/%d/%Y")
port = str(log.get("port", "")).strip()
impact = log.get("event", "").strip()
severity_justification = log.get("json_event", "").strip()
service = log.get("target", "").strip()
unique_id = log.get("scan_id", uuid.uuid4().hex)
metadata = all_module_severity_and_desc.get(module_name, {})
severity_raw = metadata.get("severity", 0)
description = metadata.get("desc", "")
if severity_raw >= 9:
severity = severity_mapping[5]
elif severity_raw >= 7:
severity = severity_mapping[4]
elif severity_raw >= 4:
severity = severity_mapping[3]
elif severity_raw > 0:
severity = severity_mapping[2]
else:
severity = severity_mapping[1]
findings.append(
{
"date": date,
"title": module_name,
"description": description.strip(),
"severity": severity,
"param": port,
"impact": impact,
"severity_justification": severity_justification,
"service": service,
"unique_id_from_tool": unique_id,
"static_finding": False,
"dynamic_finding": True,
}
)
return json.dumps({"findings": findings}, indent=4)
def create_sarif_report(all_scan_logs):
"""
Takes all_scan_logs and converts them to a SARIF based json
format. The schema and version used are 2.1.0 linked below.
The following conversions are made:
ruleId: name of the module
message: event value for each log in all_scan_logs
locations.physicalLocations.artifactLocation.uri: target value
webRequest.properties.json_event: json_event value for each log in all_scan_logs
properties.scan_id: scan_id unique value for each run
properties.date: date field specified in all_scan_logs
"""
sarif_structure = {
"$schema": "https://json.schemastore.org/sarif-2.1.0.json",
"version": "2.1.0",
"runs": [
{
"tool": {
"driver": {
"name": "Nettacker",
"version": "0.4.0",
"informationUri": "https://github.com/OWASP/Nettacker",
}
},
"results": [],
}
],
}
for log in all_scan_logs:
sarif_result = {
"ruleId": log["module_name"],
"message": {"text": log["event"]},
"locations": [{"physicalLocation": {"artifactLocation": {"uri": log["target"]}}}],
"properties": {
"scan_id": log["scan_id"],
"date": log["date"],
"json_event": log["json_event"],
},
}
sarif_structure["runs"][0]["results"].append(sarif_result)
return json.dumps(sarif_structure, indent=2)
def create_report(options, scan_id):
"""
sort all events, create log file in HTML/TEXT/JSON and remove old logs
@ -179,25 +274,34 @@ def create_report(options, scan_id):
+ "</p>"
+ log_data.json_parse_js
)
with open(report_path_filename, "w", encoding="utf-8") as report_file:
with Path(report_path_filename).open("w", encoding="utf-8") as report_file:
report_file.write(html_table_content + "\n")
report_file.close()
elif len(report_path_filename) >= 5 and report_path_filename[-8:].lower() == ".dd.json":
with Path(report_path_filename).open("w", encoding="utf-8") as report_file:
dd_content_json = create_dd_specific_json(all_scan_logs)
report_file.write(dd_content_json + "\n")
elif len(report_path_filename) >= 5 and report_path_filename[-5:] == ".json":
with open(report_path_filename, "w", encoding="utf-8") as report_file:
with Path(report_path_filename).open("w", encoding="utf-8") as report_file:
report_file.write(str(json.dumps(all_scan_logs)) + "\n")
report_file.close()
elif len(report_path_filename) >= 6 and report_path_filename[-6:].lower() == ".sarif":
with Path(report_path_filename).open("w", encoding="utf-8") as report_file:
sarif_content = create_sarif_report(all_scan_logs)
report_file.write(sarif_content + "\n")
elif len(report_path_filename) >= 5 and report_path_filename[-4:] == ".csv":
keys = all_scan_logs[0].keys()
with open(report_path_filename, "a") as csvfile:
with Path(report_path_filename).open("a") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=keys)
writer.writeheader()
for log_list in all_scan_logs:
dict_data = {key: value for key, value in log_list.items() if key in keys}
writer.writerow(dict_data)
csvfile.close()
else:
with open(report_path_filename, "w", encoding="utf-8") as report_file:
with Path(report_path_filename).open("w", encoding="utf-8") as report_file:
report_file.write(build_text_table(all_scan_logs))
log.write(build_text_table(all_scan_logs))
@ -278,20 +382,20 @@ def create_compare_report(options, scan_id):
len(fullpath) >= 4 and fullpath[-4:] == ".htm"
):
html_report = build_compare_report(compare_results)
with open(fullpath, "w", encoding="utf-8") as compare_report:
with Path(fullpath).open("w", encoding="utf-8") as compare_report:
compare_report.write(html_report + "\n")
elif len(fullpath) >= 5 and fullpath[-5:] == ".json":
with open(fullpath, "w", encoding="utf-8") as compare_report:
with Path(fullpath).open("w", encoding="utf-8") as compare_report:
compare_report.write(str(json.dumps(compare_results)) + "\n")
elif len(fullpath) >= 5 and fullpath[-4:] == ".csv":
keys = compare_results.keys()
with open(fullpath, "a") as csvfile:
with Path(fullpath).open("a") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=keys)
if csvfile.tell() == 0:
writer.writeheader()
writer.writerow(compare_results)
else:
with open(fullpath, "w", encoding="utf-8") as compare_report:
with Path(fullpath).open("w", encoding="utf-8") as compare_report:
compare_report.write(create_compare_text_table(compare_results))
log.write(create_compare_text_table(compare_results))

View File

@ -130,7 +130,7 @@ def test_create_report_html(
{"date": "now", "target": "x", "module_name": "mod", "port": 80, "json_event": "{}"}
],
)
@patch("builtins.open", new_callable=mock_open)
@patch("nettacker.core.graph.Path.open", new_callable=mock_open)
@patch("nettacker.core.graph.submit_report_to_db")
def test_json_report(mock_submit, mock_open_file, mock_get_logs):
options = MagicMock()
@ -148,7 +148,7 @@ def test_json_report(mock_submit, mock_open_file, mock_get_logs):
],
)
@patch("csv.DictWriter")
@patch("builtins.open", new_callable=mock_open)
@patch("nettacker.core.graph.Path.open", new_callable=mock_open)
@patch("nettacker.core.graph.submit_report_to_db")
def test_csv_report(mock_submit, mock_open_file, mock_csv_writer, mock_get_logs):
options = MagicMock()
@ -168,7 +168,7 @@ def test_csv_report(mock_submit, mock_open_file, mock_csv_writer, mock_get_logs)
],
)
@patch("nettacker.core.graph.build_text_table", return_value="text table")
@patch("builtins.open", new_callable=mock_open)
@patch("nettacker.core.graph.Path.open", new_callable=mock_open)
@patch("nettacker.core.graph.submit_report_to_db")
def test_text_report(mock_submit, mock_open_file, mock_build_text, mock_get_logs):
options = MagicMock()
@ -182,7 +182,7 @@ def test_text_report(mock_submit, mock_open_file, mock_build_text, mock_get_logs
@patch("nettacker.core.graph.get_logs_by_scan_id")
@patch("nettacker.core.graph.get_options_by_scan_id")
@patch("nettacker.core.graph.build_compare_report", return_value="<html-report>")
@patch("nettacker.core.graph.open", new_callable=mock_open)
@patch("nettacker.core.graph.Path.open", new_callable=mock_open)
@patch("nettacker.core.graph.os.path.normpath", side_effect=lambda x: x)
@patch("nettacker.core.graph.os.path.join", side_effect=lambda *args: "/".join(args))
@patch("nettacker.core.graph.create_compare_text_table", return_value="text-report")
@ -267,7 +267,7 @@ def test_permission_error(mock_join, mock_norm, mock_opts, mock_logs):
@patch("nettacker.core.graph.get_logs_by_scan_id")
@patch("nettacker.core.graph.get_options_by_scan_id")
@patch("nettacker.core.graph.create_compare_text_table", return_value="some-text")
@patch("nettacker.core.graph.open", new_callable=mock_open)
@patch("nettacker.core.graph.Path.open", new_callable=mock_open)
def test_dict_options(mock_open_file, mock_text, mock_opts, mock_logs):
dummy_log = {
"target": "1.1.1.1",