Feature: Exclude certain ports from being scanned (#1099)

* feature: exclude certain ports from being scanned

* ruff fixes

* handling exception for vulnerablility modules

* not relying on try and except

* updated documentation, changed flag

* test case for module file

* update test

* mocking the database calls, that's probably the issue

* removed breaking test for now

* coderabbit suggested change, minor code refactoring

* ruff fixes

---------

Signed-off-by: Achintya Jai <153343775+pUrGe12@users.noreply.github.com>
This commit is contained in:
Achintya Jai 2025-07-27 13:41:53 +05:30 committed by GitHub
parent e450c819d8
commit 9a0006ea42
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 432 additions and 10 deletions

View File

@ -35,9 +35,10 @@ By using the `--help`/`-h` switch you can read the help menu in the CLI:
[2024-09-26 07:51:09][+] 106 modules loaded ...
usage: Nettacker [-L LANGUAGE] [-v] [--verbose-event] [-V] [-o REPORT_PATH_FILENAME] [--graph GRAPH_NAME] [-h]
[-i TARGETS] [-l TARGETS_LIST] [-m SELECTED_MODULES] [--modules-extra-args MODULES_EXTRA_ARGS]
[--show-all-modules] [--profile PROFILES] [--show-all-profiles] [-x EXCLUDED_MODULES] [-u USERNAMES]
[-U USERNAMES_LIST] [-p PASSWORDS] [-P PASSWORDS_LIST] [-g PORTS] [--user-agent USER_AGENT]
[-T TIMEOUT] [-w TIME_SLEEP_BETWEEN_REQUESTS] [-r] [-s] [-d] [-t THREAD_PER_HOST] [-H HTTP_HEADER]
[--show-all-modules] [--profile PROFILES] [--show-all-profiles] [-x EXCLUDED_MODULES]
[-X EXCLUDED_PORTS] [-u USERNAMES] [-U USERNAMES_LIST] [-p PASSWORDS] [-P PASSWORDS_LIST] [-g PORTS]
[--user-agent USER_AGENT] [-T TIMEOUT] [-w TIME_SLEEP_BETWEEN_REQUESTS] [-r] [-s] [-d] [-t THREAD_PER_HOST]
[--show-all-modules] [--profile PROFILES] [--show-all-profiles] [-x EXCLUDED_MODULES] [-H HTTP_HEADER]
[-M PARALLEL_MODULE_SCAN] [--set-hardware-usage SET_HARDWARE_USAGE] [-R SOCKS_PROXY]
[--retries RETRIES] [--ping-before-scan] [-K SCAN_COMPARE_ID] [-J COMPARE_REPORT_PATH_FILENAME]
[--start-api] [--api-host API_HOSTNAME] [--api-port API_PORT] [--api-debug-mode]
@ -87,6 +88,8 @@ Method:
'adobe_coldfusion_cve_2023_26360_vuln', 'apache_cve_2021_41773_vuln',
'apache_cve_2021_42013_vuln', 'apache_ofbiz_cve_2024_38856_vuln', 'apache_struts_vuln',
'aviatrix_cve_2021_40870_vuln', 'cisco_hyperflex_cve_2021_1497_vuln']
-X EXCLUDED_PORTS, --exclude-ports
Ports to exclude (e.g. 80 || 80,443|| 1000-1300)
-u USERNAMES, --usernames USERNAMES
username(s) list, separate with ","
-U USERNAMES_LIST, --users-list USERNAMES_LIST
@ -316,6 +319,7 @@ https://owasp.org
```
python nettacker.py -i 192.168.1.1,192.168.1.2-192.168.1.10,127.0.0.1,owasp.org,192.168.2.1/24 -m port_scan -g 20-100 -t 10
python nettacker.py -l targets.txt -m all -x port_scan -g 20-100 -t 5 -u root -p 123456,654321,123123
python nettacker.py -l targets.txt -m all -t 100 -d -u root -p 12345,432123 -X 80
```
* Here are some more command line examples:

View File

@ -134,6 +134,7 @@ class DefaultSettings(ConfigBase):
"""OWASP Nettacker Default Configuration"""
excluded_modules = None
excluded_ports = None
graph_name = "d3_tree_v2_graph"
language = "en"
modules_extra_args = None

View File

@ -254,6 +254,14 @@ class ArgParser(ArgumentParser):
default=Config.settings.excluded_modules,
help=_("exclude_scan_method").format(exclude_modules),
)
method_options.add_argument(
"-X",
"--exclude-ports",
action="store",
dest="excluded_ports",
default=Config.settings.excluded_ports,
help=_("exclude_ports"),
)
method_options.add_argument(
"-u",
"--usernames",
@ -649,21 +657,35 @@ class ArgParser(ArgumentParser):
options.selected_modules.remove(excluded_module)
# Check port(s)
if options.ports:
tmp_ports = []
tmp_ports = set()
for port in options.ports.split(","):
try:
if "-" in port:
for port_number in range(
int(port.split("-")[0]), int(port.split("-")[1]) + 1
):
if port_number not in tmp_ports:
tmp_ports.append(port_number)
tmp_ports.add(port_number)
else:
if int(port) not in tmp_ports:
tmp_ports.append(int(port))
tmp_ports.add(int(port))
except Exception:
die_failure(_("ports_int"))
options.ports = tmp_ports
options.ports = list(tmp_ports)
# Check for excluded ports
if options.excluded_ports:
tmp_excluded_ports = set()
for excluded_port in options.excluded_ports.split(","):
try:
if "-" in excluded_port:
for excluded_port_number in range(
int(excluded_port.split("-")[0]), int(excluded_port.split("-")[1]) + 1
):
tmp_excluded_ports.add(excluded_port_number)
else:
tmp_excluded_ports.add(int(excluded_port))
except Exception:
die_failure(_("ports_int"))
options.excluded_ports = list(tmp_excluded_ports)
if options.user_agent == "random_user_agent":
options.user_agents = open(Config.path.user_agents_file).read().split("\n")

View File

@ -109,6 +109,12 @@ class Module:
index_payload += 1
def generate_loops(self):
if self.module_inputs["excluded_ports"]:
excluded_port_set = set(self.module_inputs["excluded_ports"])
if self.module_content and "ports" in self.module_content["payloads"][0]["steps"][0]:
all_ports = self.module_content["payloads"][0]["steps"][0]["ports"]
all_ports[:] = [port for port in all_ports if port not in excluded_port_set]
self.module_content["payloads"] = expand_module_steps(self.module_content["payloads"])
def sort_loops(self):
@ -152,7 +158,6 @@ class Module:
importlib.import_module(f"nettacker.core.lib.{library.lower()}"),
f"{library.capitalize()}Engine",
)()
for step in payload["steps"]:
for sub_step in step:
thread = Thread(

View File

@ -126,4 +126,5 @@ compare_report_saved: "compare results saved in {0}"
build_compare_report: "building compare report"
finish_build_report: "Finished building compare report"
user_wordlist: "Allows users to enter their own wordlist"
exclude_ports: "Ports to exclude (e.g. 80 || 80,443|| 1000-1300)"
http_header: "Add custom HTTP headers to requests (format: 'key: value'). For multiple headers, use multiple -H flags"

View File

@ -0,0 +1,389 @@
import json
from unittest.mock import patch, MagicMock
import pytest
from nettacker.core.module import Module
class DummyOptions:
def __init__(self):
self.modules_extra_args = {"foo": "bar"}
self.skip_service_discovery = False
self.time_sleep_between_requests = 0
self.thread_per_host = 2
@pytest.fixture
def options():
return DummyOptions()
@pytest.fixture
def module_args():
return {
"target": "127.0.0.1",
"scan_id": "scan123",
"process_number": 1,
"thread_number": 1,
"total_number_threads": 1,
}
@patch("nettacker.core.module.TemplateLoader")
def test_init_and_service_discovery_signature(mock_loader, options, module_args):
mock_instance = MagicMock()
mock_instance.load.return_value = {
"payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}]
}
mock_loader.return_value = mock_instance
module = Module("port_scan", options, **module_args)
assert "http" in module.service_discovery_signatures
@patch("os.listdir", return_value=["http.py"])
@patch("nettacker.core.module.find_events")
@patch("nettacker.core.module.TemplateLoader")
def test_load_with_service_discovery(
mock_loader, mock_find_events, mock_listdir, options, module_args
):
mock_loader_inst = MagicMock()
mock_loader_inst.load.return_value = {
"payloads": [
{
"library": "http",
"steps": [{"response": {"conditions": {"service": {"http": {}}}}}],
}
]
}
mock_loader.return_value = mock_loader_inst
mock_find_events.return_value = [
MagicMock(json_event='{"port": 80, "response": {"conditions_results": {"http": {}}}}')
]
module = Module("test_module", options, **module_args)
module.load()
assert module.discovered_services == {"http": [80]}
assert len(module.module_content["payloads"]) == 1
@patch("nettacker.core.module.find_events")
@patch("nettacker.core.module.TemplateLoader")
def test_sort_loops(mock_loader, mock_find_events, options, module_args):
mock_loader_inst = MagicMock()
mock_loader_inst.load.return_value = {
"payloads": [
{
"library": "http",
"steps": [
{"response": {"conditions": {"service": {}}}},
{
"response": {
"conditions": {},
"dependent_on_temp_event": True,
"save_to_temp_events_only": True,
}
},
{"response": {"conditions": {}, "dependent_on_temp_event": True}},
],
}
]
}
mock_loader.return_value = mock_loader_inst
mock_event = MagicMock()
mock_event.json_event = json.dumps(
{"port": 80, "response": {"conditions_results": {"http": True}}}
)
mock_find_events.return_value = [mock_event]
module = Module("test_module", options, **module_args)
module.libraries = ["http"]
module.load() # Should not raise
@patch("nettacker.core.module.find_events")
@patch("nettacker.core.module.TemplateLoader")
def test_start_unsupported_library(mock_loader, mock_find_events, options, module_args):
mock_loader_inst = MagicMock()
mock_loader_inst.load.return_value = {
"payloads": [
{
"library": "unsupported_lib",
"steps": [{"step_id": 1, "response": {"conditions": {"service": {}}}}],
}
]
}
mock_loader.return_value = mock_loader_inst
mock_event = MagicMock()
mock_event.json_event = json.dumps(
{"port": 1234, "response": {"conditions_results": {"unsupported_lib": True}}}
)
mock_find_events.return_value = [mock_event]
module = Module("test_module", options, **module_args)
module.libraries = ["http"]
module.service_discovery_signatures.append("unsupported_lib")
module.load()
result = module.start()
assert result is None
def template_loader_side_effect(name, inputs):
# NOT A TEST CASE
mock_instance = MagicMock()
# as in inside Module.__init__
if name == "port_scan":
mock_instance.load.return_value = {
"payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}]
}
# as in module.load()
elif name == "test_module":
mock_instance.load.return_value = {
"payloads": [
{
"library": "http",
"steps": [
[{"response": {"conditions": {"service": {}}}}],
[
{
"response": {
"conditions": {},
"dependent_on_temp_event": True,
"save_to_temp_events_only": True,
}
}
],
[{"response": {"conditions": {}, "dependent_on_temp_event": True}}],
],
}
]
}
else:
raise ValueError(f"Unexpected module name: {name}")
return mock_instance
@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda step, _: step)
@patch("nettacker.core.module.find_events")
@patch("nettacker.core.module.TemplateLoader")
def test_sort_loops_behavior(mock_loader_cls, mock_find_events, mock_parse, options, module_args):
# This one is painful
mock_loader_cls.side_effect = template_loader_side_effect
mock_event = MagicMock()
mock_event.json_event = json.dumps(
{"port": 80, "response": {"conditions_results": {"http": True}}}
)
mock_find_events.return_value = [mock_event]
module = Module("test_module", options, **module_args)
module.libraries = ["http"]
module.load()
module.sort_loops()
steps = module.module_content["payloads"][0]["steps"]
assert steps[0][0]["response"]["conditions"] == {"service": {}}
assert steps[1][0]["response"]["dependent_on_temp_event"] is True
assert steps[1][0]["response"]["save_to_temp_events_only"] is True
assert steps[2][0]["response"]["dependent_on_temp_event"] is True
assert "save_to_temp_events_only" not in steps[2][0]["response"]
def start_test_loader_side_effect(name, inputs):
# HELPER for start test
mock_inst = MagicMock()
if name == "port_scan":
mock_inst.load.return_value = {
"payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}]
}
elif name == "test_module":
mock_inst.load.return_value = {
"payloads": [
{
"library": "http",
"steps": [[{"response": {}, "id": 1}], [{"response": {}, "id": 2}]],
}
]
}
else:
raise ValueError(f"Unexpected module name: {name}")
return mock_inst
@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda x, _: x)
@patch("nettacker.core.module.log")
@patch("nettacker.core.module.TemplateLoader")
@patch("nettacker.core.module.find_events")
def test_start_library_not_supported(
mock_find_events,
mock_loader_cls,
mock_log,
mock_parse,
module_args,
):
def loader_side_effect_specific(name, inputs):
mock_inst = MagicMock()
if name == "port_scan":
mock_inst.load.return_value = {
"payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}]
}
elif name == "test_module":
mock_inst.load.return_value = {
"payloads": [
{
"library": "unsupported_lib",
"steps": [
[{"id": 1}],
],
}
]
}
return mock_inst
mock_loader_cls.side_effect = loader_side_effect_specific
mock_event = MagicMock()
mock_event.json_event = json.dumps(
{"port": 80, "response": {"conditions_results": {"http": True}}}
)
mock_find_events.return_value = [mock_event]
# Had to add this small workaround
class DummyOptionsSpecific:
def __init__(self):
self.modules_extra_args = {}
self.skip_service_discovery = True
self.time_sleep_between_requests = 0
self.thread_per_host = 2
options = DummyOptionsSpecific()
module = Module("test_module", options, **module_args)
module.libraries = ["http"]
module.load()
result = module.start()
assert result is None
mock_log.warn.assert_called_once()
assert "unsupported_lib" in mock_log.warn.call_args[0][0]
@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda step, _: step)
@patch("nettacker.core.module.find_events")
@patch("nettacker.core.module.TemplateLoader")
def test_load_appends_port_to_existing_protocol(
mock_loader_cls,
mock_find_events,
mock_parse,
options,
module_args,
):
def loader_side_effect_specific(name, inputs):
mock_inst = MagicMock()
mock_inst.load.return_value = {
"payloads": [
{
"library": "http",
"steps": [
{"response": {"conditions": {"service": {}}}} # .load() requires no []
],
}
]
}
return mock_inst
mock_loader_cls.side_effect = loader_side_effect_specific
mock_find_events.return_value = [
MagicMock(
json_event=json.dumps({"port": 80, "response": {"conditions_results": {"http": {}}}})
),
MagicMock(
json_event=json.dumps({"port": 443, "response": {"conditions_results": {"http": {}}}})
),
]
module = Module("test_module", options, **module_args)
module.libraries = ["http"]
module.service_discovery_signatures = ["http"]
module.load()
assert module.discovered_services == {"http": [80, 443]}
@patch("nettacker.core.module.expand_module_steps")
@patch("nettacker.core.module.TemplateLoader")
def test_generate_loops_with_excluded_ports_and_ports_in_content(
mock_loader, mock_expand_steps, options, module_args
):
mock_instance = MagicMock()
return_value = {
"payloads": [
{
"steps": [
{
"ports": [80, 443, 8080],
"response": {"conditions": {"service": {}}},
}
]
}
]
}
mock_instance.load.return_value = return_value
mock_loader.return_value = mock_instance
mock_expand_steps.side_effect = lambda x: x
module = Module("test_module", options, **module_args)
module.module_inputs = {"excluded_ports": [443, 8080]}
module.module_content = return_value
module.generate_loops()
expected_ports = [80] # 443 and 8080 should be excluded
actual_ports = module.module_content["payloads"][0]["steps"][0]["ports"]
assert actual_ports == expected_ports
@patch("nettacker.core.module.expand_module_steps")
@patch("nettacker.core.module.TemplateLoader")
def test_generate_loops_with_excluded_ports_no_ports_in_content(
mock_loader, mock_expand_steps, options, module_args
):
mock_instance = MagicMock()
return_value = {
"payloads": [
{
"steps": [
{
"ports": [80, 443, 8080],
"response": {"conditions": {"service": {}}},
}
]
}
]
}
mock_instance.load.return_value = return_value
mock_loader.return_value = mock_instance
mock_expand_steps.side_effect = lambda x: x
module = Module("test_module", options, **module_args)
module.module_inputs = {"excluded_ports": None}
module.module_content = return_value
module.generate_loops()
expected_ports = [80, 443, 8080]
actual_ports = module.module_content["payloads"][0]["steps"][0]["ports"]
assert actual_ports == expected_ports