diff --git a/docs/Usage.md b/docs/Usage.md index d140cef5..6040e677 100644 --- a/docs/Usage.md +++ b/docs/Usage.md @@ -35,9 +35,10 @@ By using the `--help`/`-h` switch you can read the help menu in the CLI: [2024-09-26 07:51:09][+] 106 modules loaded ... usage: Nettacker [-L LANGUAGE] [-v] [--verbose-event] [-V] [-o REPORT_PATH_FILENAME] [--graph GRAPH_NAME] [-h] [-i TARGETS] [-l TARGETS_LIST] [-m SELECTED_MODULES] [--modules-extra-args MODULES_EXTRA_ARGS] - [--show-all-modules] [--profile PROFILES] [--show-all-profiles] [-x EXCLUDED_MODULES] [-u USERNAMES] - [-U USERNAMES_LIST] [-p PASSWORDS] [-P PASSWORDS_LIST] [-g PORTS] [--user-agent USER_AGENT] - [-T TIMEOUT] [-w TIME_SLEEP_BETWEEN_REQUESTS] [-r] [-s] [-d] [-t THREAD_PER_HOST] [-H HTTP_HEADER] + [--show-all-modules] [--profile PROFILES] [--show-all-profiles] [-x EXCLUDED_MODULES] + [-X EXCLUDED_PORTS] [-u USERNAMES] [-U USERNAMES_LIST] [-p PASSWORDS] [-P PASSWORDS_LIST] [-g PORTS] + [--user-agent USER_AGENT] [-T TIMEOUT] [-w TIME_SLEEP_BETWEEN_REQUESTS] [-r] [-s] [-d] [-t THREAD_PER_HOST] + [--show-all-modules] [--profile PROFILES] [--show-all-profiles] [-x EXCLUDED_MODULES] [-H HTTP_HEADER] [-M PARALLEL_MODULE_SCAN] [--set-hardware-usage SET_HARDWARE_USAGE] [-R SOCKS_PROXY] [--retries RETRIES] [--ping-before-scan] [-K SCAN_COMPARE_ID] [-J COMPARE_REPORT_PATH_FILENAME] [--start-api] [--api-host API_HOSTNAME] [--api-port API_PORT] [--api-debug-mode] @@ -87,6 +88,8 @@ Method: 'adobe_coldfusion_cve_2023_26360_vuln', 'apache_cve_2021_41773_vuln', 'apache_cve_2021_42013_vuln', 'apache_ofbiz_cve_2024_38856_vuln', 'apache_struts_vuln', 'aviatrix_cve_2021_40870_vuln', 'cisco_hyperflex_cve_2021_1497_vuln'] + -X EXCLUDED_PORTS, --exclude-ports + Ports to exclude (e.g. 80 || 80,443|| 1000-1300) -u USERNAMES, --usernames USERNAMES username(s) list, separate with "," -U USERNAMES_LIST, --users-list USERNAMES_LIST @@ -316,6 +319,7 @@ https://owasp.org ``` python nettacker.py -i 192.168.1.1,192.168.1.2-192.168.1.10,127.0.0.1,owasp.org,192.168.2.1/24 -m port_scan -g 20-100 -t 10 python nettacker.py -l targets.txt -m all -x port_scan -g 20-100 -t 5 -u root -p 123456,654321,123123 +python nettacker.py -l targets.txt -m all -t 100 -d -u root -p 12345,432123 -X 80 ``` * Here are some more command line examples: diff --git a/nettacker/config.py b/nettacker/config.py index 450bcd20..64af9def 100644 --- a/nettacker/config.py +++ b/nettacker/config.py @@ -134,6 +134,7 @@ class DefaultSettings(ConfigBase): """OWASP Nettacker Default Configuration""" excluded_modules = None + excluded_ports = None graph_name = "d3_tree_v2_graph" language = "en" modules_extra_args = None diff --git a/nettacker/core/arg_parser.py b/nettacker/core/arg_parser.py index 0fa42679..fab93c3d 100644 --- a/nettacker/core/arg_parser.py +++ b/nettacker/core/arg_parser.py @@ -254,6 +254,14 @@ class ArgParser(ArgumentParser): default=Config.settings.excluded_modules, help=_("exclude_scan_method").format(exclude_modules), ) + method_options.add_argument( + "-X", + "--exclude-ports", + action="store", + dest="excluded_ports", + default=Config.settings.excluded_ports, + help=_("exclude_ports"), + ) method_options.add_argument( "-u", "--usernames", @@ -649,21 +657,35 @@ class ArgParser(ArgumentParser): options.selected_modules.remove(excluded_module) # Check port(s) if options.ports: - tmp_ports = [] + tmp_ports = set() for port in options.ports.split(","): try: if "-" in port: for port_number in range( int(port.split("-")[0]), int(port.split("-")[1]) + 1 ): - if port_number not in tmp_ports: - tmp_ports.append(port_number) + tmp_ports.add(port_number) else: - if int(port) not in tmp_ports: - tmp_ports.append(int(port)) + tmp_ports.add(int(port)) except Exception: die_failure(_("ports_int")) - options.ports = tmp_ports + options.ports = list(tmp_ports) + # Check for excluded ports + if options.excluded_ports: + tmp_excluded_ports = set() + + for excluded_port in options.excluded_ports.split(","): + try: + if "-" in excluded_port: + for excluded_port_number in range( + int(excluded_port.split("-")[0]), int(excluded_port.split("-")[1]) + 1 + ): + tmp_excluded_ports.add(excluded_port_number) + else: + tmp_excluded_ports.add(int(excluded_port)) + except Exception: + die_failure(_("ports_int")) + options.excluded_ports = list(tmp_excluded_ports) if options.user_agent == "random_user_agent": options.user_agents = open(Config.path.user_agents_file).read().split("\n") diff --git a/nettacker/core/module.py b/nettacker/core/module.py index 2f389c43..17ab6060 100644 --- a/nettacker/core/module.py +++ b/nettacker/core/module.py @@ -109,6 +109,12 @@ class Module: index_payload += 1 def generate_loops(self): + if self.module_inputs["excluded_ports"]: + excluded_port_set = set(self.module_inputs["excluded_ports"]) + if self.module_content and "ports" in self.module_content["payloads"][0]["steps"][0]: + all_ports = self.module_content["payloads"][0]["steps"][0]["ports"] + all_ports[:] = [port for port in all_ports if port not in excluded_port_set] + self.module_content["payloads"] = expand_module_steps(self.module_content["payloads"]) def sort_loops(self): @@ -152,7 +158,6 @@ class Module: importlib.import_module(f"nettacker.core.lib.{library.lower()}"), f"{library.capitalize()}Engine", )() - for step in payload["steps"]: for sub_step in step: thread = Thread( diff --git a/nettacker/locale/en.yaml b/nettacker/locale/en.yaml index 0c3ab0ba..e6e29e44 100644 --- a/nettacker/locale/en.yaml +++ b/nettacker/locale/en.yaml @@ -126,4 +126,5 @@ compare_report_saved: "compare results saved in {0}" build_compare_report: "building compare report" finish_build_report: "Finished building compare report" user_wordlist: "Allows users to enter their own wordlist" +exclude_ports: "Ports to exclude (e.g. 80 || 80,443|| 1000-1300)" http_header: "Add custom HTTP headers to requests (format: 'key: value'). For multiple headers, use multiple -H flags" diff --git a/tests/core/test_exclude_ports.py b/tests/core/test_exclude_ports.py new file mode 100644 index 00000000..dc5b26de --- /dev/null +++ b/tests/core/test_exclude_ports.py @@ -0,0 +1,389 @@ +import json +from unittest.mock import patch, MagicMock + +import pytest + +from nettacker.core.module import Module + + +class DummyOptions: + def __init__(self): + self.modules_extra_args = {"foo": "bar"} + self.skip_service_discovery = False + self.time_sleep_between_requests = 0 + self.thread_per_host = 2 + + +@pytest.fixture +def options(): + return DummyOptions() + + +@pytest.fixture +def module_args(): + return { + "target": "127.0.0.1", + "scan_id": "scan123", + "process_number": 1, + "thread_number": 1, + "total_number_threads": 1, + } + + +@patch("nettacker.core.module.TemplateLoader") +def test_init_and_service_discovery_signature(mock_loader, options, module_args): + mock_instance = MagicMock() + mock_instance.load.return_value = { + "payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}] + } + mock_loader.return_value = mock_instance + + module = Module("port_scan", options, **module_args) + assert "http" in module.service_discovery_signatures + + +@patch("os.listdir", return_value=["http.py"]) +@patch("nettacker.core.module.find_events") +@patch("nettacker.core.module.TemplateLoader") +def test_load_with_service_discovery( + mock_loader, mock_find_events, mock_listdir, options, module_args +): + mock_loader_inst = MagicMock() + mock_loader_inst.load.return_value = { + "payloads": [ + { + "library": "http", + "steps": [{"response": {"conditions": {"service": {"http": {}}}}}], + } + ] + } + mock_loader.return_value = mock_loader_inst + + mock_find_events.return_value = [ + MagicMock(json_event='{"port": 80, "response": {"conditions_results": {"http": {}}}}') + ] + + module = Module("test_module", options, **module_args) + module.load() + + assert module.discovered_services == {"http": [80]} + assert len(module.module_content["payloads"]) == 1 + + +@patch("nettacker.core.module.find_events") +@patch("nettacker.core.module.TemplateLoader") +def test_sort_loops(mock_loader, mock_find_events, options, module_args): + mock_loader_inst = MagicMock() + mock_loader_inst.load.return_value = { + "payloads": [ + { + "library": "http", + "steps": [ + {"response": {"conditions": {"service": {}}}}, + { + "response": { + "conditions": {}, + "dependent_on_temp_event": True, + "save_to_temp_events_only": True, + } + }, + {"response": {"conditions": {}, "dependent_on_temp_event": True}}, + ], + } + ] + } + mock_loader.return_value = mock_loader_inst + + mock_event = MagicMock() + mock_event.json_event = json.dumps( + {"port": 80, "response": {"conditions_results": {"http": True}}} + ) + mock_find_events.return_value = [mock_event] + + module = Module("test_module", options, **module_args) + module.libraries = ["http"] + module.load() # Should not raise + + +@patch("nettacker.core.module.find_events") +@patch("nettacker.core.module.TemplateLoader") +def test_start_unsupported_library(mock_loader, mock_find_events, options, module_args): + mock_loader_inst = MagicMock() + mock_loader_inst.load.return_value = { + "payloads": [ + { + "library": "unsupported_lib", + "steps": [{"step_id": 1, "response": {"conditions": {"service": {}}}}], + } + ] + } + mock_loader.return_value = mock_loader_inst + + mock_event = MagicMock() + mock_event.json_event = json.dumps( + {"port": 1234, "response": {"conditions_results": {"unsupported_lib": True}}} + ) + mock_find_events.return_value = [mock_event] + + module = Module("test_module", options, **module_args) + module.libraries = ["http"] + module.service_discovery_signatures.append("unsupported_lib") + + module.load() + result = module.start() + + assert result is None + + +def template_loader_side_effect(name, inputs): + # NOT A TEST CASE + mock_instance = MagicMock() + + # as in inside Module.__init__ + if name == "port_scan": + mock_instance.load.return_value = { + "payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}] + } + # as in module.load() + elif name == "test_module": + mock_instance.load.return_value = { + "payloads": [ + { + "library": "http", + "steps": [ + [{"response": {"conditions": {"service": {}}}}], + [ + { + "response": { + "conditions": {}, + "dependent_on_temp_event": True, + "save_to_temp_events_only": True, + } + } + ], + [{"response": {"conditions": {}, "dependent_on_temp_event": True}}], + ], + } + ] + } + else: + raise ValueError(f"Unexpected module name: {name}") + + return mock_instance + + +@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda step, _: step) +@patch("nettacker.core.module.find_events") +@patch("nettacker.core.module.TemplateLoader") +def test_sort_loops_behavior(mock_loader_cls, mock_find_events, mock_parse, options, module_args): + # This one is painful + mock_loader_cls.side_effect = template_loader_side_effect + + mock_event = MagicMock() + mock_event.json_event = json.dumps( + {"port": 80, "response": {"conditions_results": {"http": True}}} + ) + mock_find_events.return_value = [mock_event] + + module = Module("test_module", options, **module_args) + module.libraries = ["http"] + module.load() + module.sort_loops() + + steps = module.module_content["payloads"][0]["steps"] + + assert steps[0][0]["response"]["conditions"] == {"service": {}} + assert steps[1][0]["response"]["dependent_on_temp_event"] is True + assert steps[1][0]["response"]["save_to_temp_events_only"] is True + assert steps[2][0]["response"]["dependent_on_temp_event"] is True + assert "save_to_temp_events_only" not in steps[2][0]["response"] + + +def start_test_loader_side_effect(name, inputs): + # HELPER for start test + mock_inst = MagicMock() + + if name == "port_scan": + mock_inst.load.return_value = { + "payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}] + } + elif name == "test_module": + mock_inst.load.return_value = { + "payloads": [ + { + "library": "http", + "steps": [[{"response": {}, "id": 1}], [{"response": {}, "id": 2}]], + } + ] + } + else: + raise ValueError(f"Unexpected module name: {name}") + + return mock_inst + + +@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda x, _: x) +@patch("nettacker.core.module.log") +@patch("nettacker.core.module.TemplateLoader") +@patch("nettacker.core.module.find_events") +def test_start_library_not_supported( + mock_find_events, + mock_loader_cls, + mock_log, + mock_parse, + module_args, +): + def loader_side_effect_specific(name, inputs): + mock_inst = MagicMock() + if name == "port_scan": + mock_inst.load.return_value = { + "payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}] + } + elif name == "test_module": + mock_inst.load.return_value = { + "payloads": [ + { + "library": "unsupported_lib", + "steps": [ + [{"id": 1}], + ], + } + ] + } + return mock_inst + + mock_loader_cls.side_effect = loader_side_effect_specific + + mock_event = MagicMock() + mock_event.json_event = json.dumps( + {"port": 80, "response": {"conditions_results": {"http": True}}} + ) + mock_find_events.return_value = [mock_event] + + # Had to add this small workaround + class DummyOptionsSpecific: + def __init__(self): + self.modules_extra_args = {} + self.skip_service_discovery = True + self.time_sleep_between_requests = 0 + self.thread_per_host = 2 + + options = DummyOptionsSpecific() + + module = Module("test_module", options, **module_args) + module.libraries = ["http"] + module.load() + + result = module.start() + + assert result is None + mock_log.warn.assert_called_once() + assert "unsupported_lib" in mock_log.warn.call_args[0][0] + + +@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda step, _: step) +@patch("nettacker.core.module.find_events") +@patch("nettacker.core.module.TemplateLoader") +def test_load_appends_port_to_existing_protocol( + mock_loader_cls, + mock_find_events, + mock_parse, + options, + module_args, +): + def loader_side_effect_specific(name, inputs): + mock_inst = MagicMock() + mock_inst.load.return_value = { + "payloads": [ + { + "library": "http", + "steps": [ + {"response": {"conditions": {"service": {}}}} # .load() requires no [] + ], + } + ] + } + return mock_inst + + mock_loader_cls.side_effect = loader_side_effect_specific + mock_find_events.return_value = [ + MagicMock( + json_event=json.dumps({"port": 80, "response": {"conditions_results": {"http": {}}}}) + ), + MagicMock( + json_event=json.dumps({"port": 443, "response": {"conditions_results": {"http": {}}}}) + ), + ] + + module = Module("test_module", options, **module_args) + module.libraries = ["http"] + module.service_discovery_signatures = ["http"] + module.load() + assert module.discovered_services == {"http": [80, 443]} + + +@patch("nettacker.core.module.expand_module_steps") +@patch("nettacker.core.module.TemplateLoader") +def test_generate_loops_with_excluded_ports_and_ports_in_content( + mock_loader, mock_expand_steps, options, module_args +): + mock_instance = MagicMock() + return_value = { + "payloads": [ + { + "steps": [ + { + "ports": [80, 443, 8080], + "response": {"conditions": {"service": {}}}, + } + ] + } + ] + } + mock_instance.load.return_value = return_value + mock_loader.return_value = mock_instance + + mock_expand_steps.side_effect = lambda x: x + + module = Module("test_module", options, **module_args) + module.module_inputs = {"excluded_ports": [443, 8080]} + module.module_content = return_value + module.generate_loops() + + expected_ports = [80] # 443 and 8080 should be excluded + actual_ports = module.module_content["payloads"][0]["steps"][0]["ports"] + assert actual_ports == expected_ports + + +@patch("nettacker.core.module.expand_module_steps") +@patch("nettacker.core.module.TemplateLoader") +def test_generate_loops_with_excluded_ports_no_ports_in_content( + mock_loader, mock_expand_steps, options, module_args +): + mock_instance = MagicMock() + return_value = { + "payloads": [ + { + "steps": [ + { + "ports": [80, 443, 8080], + "response": {"conditions": {"service": {}}}, + } + ] + } + ] + } + + mock_instance.load.return_value = return_value + mock_loader.return_value = mock_instance + + mock_expand_steps.side_effect = lambda x: x + + module = Module("test_module", options, **module_args) + module.module_inputs = {"excluded_ports": None} + module.module_content = return_value + module.generate_loops() + + expected_ports = [80, 443, 8080] + actual_ports = module.module_content["payloads"][0]["steps"][0]["ports"] + assert actual_ports == expected_ports