mirror of https://github.com/OWASP/Nettacker.git
390 lines
12 KiB
Python
390 lines
12 KiB
Python
import json
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
import pytest
|
|
|
|
from nettacker.core.module import Module
|
|
|
|
|
|
class DummyOptions:
|
|
def __init__(self):
|
|
self.modules_extra_args = {"foo": "bar"}
|
|
self.skip_service_discovery = False
|
|
self.time_sleep_between_requests = 0
|
|
self.thread_per_host = 2
|
|
|
|
|
|
@pytest.fixture
|
|
def options():
|
|
return DummyOptions()
|
|
|
|
|
|
@pytest.fixture
|
|
def module_args():
|
|
return {
|
|
"target": "127.0.0.1",
|
|
"scan_id": "scan123",
|
|
"process_number": 1,
|
|
"thread_number": 1,
|
|
"total_number_threads": 1,
|
|
}
|
|
|
|
|
|
@patch("nettacker.core.module.TemplateLoader")
|
|
def test_init_and_service_discovery_signature(mock_loader, options, module_args):
|
|
mock_instance = MagicMock()
|
|
mock_instance.load.return_value = {
|
|
"payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}]
|
|
}
|
|
mock_loader.return_value = mock_instance
|
|
|
|
module = Module("port_scan", options, **module_args)
|
|
assert "http" in module.service_discovery_signatures
|
|
|
|
|
|
@patch("os.listdir", return_value=["http.py"])
|
|
@patch("nettacker.core.module.find_events")
|
|
@patch("nettacker.core.module.TemplateLoader")
|
|
def test_load_with_service_discovery(
|
|
mock_loader, mock_find_events, mock_listdir, options, module_args
|
|
):
|
|
mock_loader_inst = MagicMock()
|
|
mock_loader_inst.load.return_value = {
|
|
"payloads": [
|
|
{
|
|
"library": "http",
|
|
"steps": [{"response": {"conditions": {"service": {"http": {}}}}}],
|
|
}
|
|
]
|
|
}
|
|
mock_loader.return_value = mock_loader_inst
|
|
|
|
mock_find_events.return_value = [
|
|
MagicMock(json_event='{"port": 80, "response": {"conditions_results": {"http": {}}}}')
|
|
]
|
|
|
|
module = Module("test_module", options, **module_args)
|
|
module.load()
|
|
|
|
assert module.discovered_services == {"http": [80]}
|
|
assert len(module.module_content["payloads"]) == 1
|
|
|
|
|
|
@patch("nettacker.core.module.find_events")
|
|
@patch("nettacker.core.module.TemplateLoader")
|
|
def test_sort_loops(mock_loader, mock_find_events, options, module_args):
|
|
mock_loader_inst = MagicMock()
|
|
mock_loader_inst.load.return_value = {
|
|
"payloads": [
|
|
{
|
|
"library": "http",
|
|
"steps": [
|
|
{"response": {"conditions": {"service": {}}}},
|
|
{
|
|
"response": {
|
|
"conditions": {},
|
|
"dependent_on_temp_event": True,
|
|
"save_to_temp_events_only": True,
|
|
}
|
|
},
|
|
{"response": {"conditions": {}, "dependent_on_temp_event": True}},
|
|
],
|
|
}
|
|
]
|
|
}
|
|
mock_loader.return_value = mock_loader_inst
|
|
|
|
mock_event = MagicMock()
|
|
mock_event.json_event = json.dumps(
|
|
{"port": 80, "response": {"conditions_results": {"http": True}}}
|
|
)
|
|
mock_find_events.return_value = [mock_event]
|
|
|
|
module = Module("test_module", options, **module_args)
|
|
module.libraries = ["http"]
|
|
module.load() # Should not raise
|
|
|
|
|
|
@patch("nettacker.core.module.find_events")
|
|
@patch("nettacker.core.module.TemplateLoader")
|
|
def test_start_unsupported_library(mock_loader, mock_find_events, options, module_args):
|
|
mock_loader_inst = MagicMock()
|
|
mock_loader_inst.load.return_value = {
|
|
"payloads": [
|
|
{
|
|
"library": "unsupported_lib",
|
|
"steps": [{"step_id": 1, "response": {"conditions": {"service": {}}}}],
|
|
}
|
|
]
|
|
}
|
|
mock_loader.return_value = mock_loader_inst
|
|
|
|
mock_event = MagicMock()
|
|
mock_event.json_event = json.dumps(
|
|
{"port": 1234, "response": {"conditions_results": {"unsupported_lib": True}}}
|
|
)
|
|
mock_find_events.return_value = [mock_event]
|
|
|
|
module = Module("test_module", options, **module_args)
|
|
module.libraries = ["http"]
|
|
module.service_discovery_signatures.append("unsupported_lib")
|
|
|
|
module.load()
|
|
result = module.start()
|
|
|
|
assert result is None
|
|
|
|
|
|
def template_loader_side_effect(name, inputs):
|
|
# NOT A TEST CASE
|
|
mock_instance = MagicMock()
|
|
|
|
# as in inside Module.__init__
|
|
if name == "port_scan":
|
|
mock_instance.load.return_value = {
|
|
"payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}]
|
|
}
|
|
# as in module.load()
|
|
elif name == "test_module":
|
|
mock_instance.load.return_value = {
|
|
"payloads": [
|
|
{
|
|
"library": "http",
|
|
"steps": [
|
|
[{"response": {"conditions": {"service": {}}}}],
|
|
[
|
|
{
|
|
"response": {
|
|
"conditions": {},
|
|
"dependent_on_temp_event": True,
|
|
"save_to_temp_events_only": True,
|
|
}
|
|
}
|
|
],
|
|
[{"response": {"conditions": {}, "dependent_on_temp_event": True}}],
|
|
],
|
|
}
|
|
]
|
|
}
|
|
else:
|
|
raise ValueError(f"Unexpected module name: {name}")
|
|
|
|
return mock_instance
|
|
|
|
|
|
@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda step, _: step)
|
|
@patch("nettacker.core.module.find_events")
|
|
@patch("nettacker.core.module.TemplateLoader")
|
|
def test_sort_loops_behavior(mock_loader_cls, mock_find_events, mock_parse, options, module_args):
|
|
# This one is painful
|
|
mock_loader_cls.side_effect = template_loader_side_effect
|
|
|
|
mock_event = MagicMock()
|
|
mock_event.json_event = json.dumps(
|
|
{"port": 80, "response": {"conditions_results": {"http": True}}}
|
|
)
|
|
mock_find_events.return_value = [mock_event]
|
|
|
|
module = Module("test_module", options, **module_args)
|
|
module.libraries = ["http"]
|
|
module.load()
|
|
module.sort_loops()
|
|
|
|
steps = module.module_content["payloads"][0]["steps"]
|
|
|
|
assert steps[0][0]["response"]["conditions"] == {"service": {}}
|
|
assert steps[1][0]["response"]["dependent_on_temp_event"] is True
|
|
assert steps[1][0]["response"]["save_to_temp_events_only"] is True
|
|
assert steps[2][0]["response"]["dependent_on_temp_event"] is True
|
|
assert "save_to_temp_events_only" not in steps[2][0]["response"]
|
|
|
|
|
|
def start_test_loader_side_effect(name, inputs):
|
|
# HELPER for start test
|
|
mock_inst = MagicMock()
|
|
|
|
if name == "port_scan":
|
|
mock_inst.load.return_value = {
|
|
"payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}]
|
|
}
|
|
elif name == "test_module":
|
|
mock_inst.load.return_value = {
|
|
"payloads": [
|
|
{
|
|
"library": "http",
|
|
"steps": [[{"response": {}, "id": 1}], [{"response": {}, "id": 2}]],
|
|
}
|
|
]
|
|
}
|
|
else:
|
|
raise ValueError(f"Unexpected module name: {name}")
|
|
|
|
return mock_inst
|
|
|
|
|
|
@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda x, _: x)
|
|
@patch("nettacker.core.module.log")
|
|
@patch("nettacker.core.module.TemplateLoader")
|
|
@patch("nettacker.core.module.find_events")
|
|
def test_start_library_not_supported(
|
|
mock_find_events,
|
|
mock_loader_cls,
|
|
mock_log,
|
|
mock_parse,
|
|
module_args,
|
|
):
|
|
def loader_side_effect_specific(name, inputs):
|
|
mock_inst = MagicMock()
|
|
if name == "port_scan":
|
|
mock_inst.load.return_value = {
|
|
"payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}]
|
|
}
|
|
elif name == "test_module":
|
|
mock_inst.load.return_value = {
|
|
"payloads": [
|
|
{
|
|
"library": "unsupported_lib",
|
|
"steps": [
|
|
[{"id": 1}],
|
|
],
|
|
}
|
|
]
|
|
}
|
|
return mock_inst
|
|
|
|
mock_loader_cls.side_effect = loader_side_effect_specific
|
|
|
|
mock_event = MagicMock()
|
|
mock_event.json_event = json.dumps(
|
|
{"port": 80, "response": {"conditions_results": {"http": True}}}
|
|
)
|
|
mock_find_events.return_value = [mock_event]
|
|
|
|
# Had to add this small workaround
|
|
class DummyOptionsSpecific:
|
|
def __init__(self):
|
|
self.modules_extra_args = {}
|
|
self.skip_service_discovery = True
|
|
self.time_sleep_between_requests = 0
|
|
self.thread_per_host = 2
|
|
|
|
options = DummyOptionsSpecific()
|
|
|
|
module = Module("test_module", options, **module_args)
|
|
module.libraries = ["http"]
|
|
module.load()
|
|
|
|
result = module.start()
|
|
|
|
assert result is None
|
|
mock_log.warn.assert_called_once()
|
|
assert "unsupported_lib" in mock_log.warn.call_args[0][0]
|
|
|
|
|
|
@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda step, _: step)
|
|
@patch("nettacker.core.module.find_events")
|
|
@patch("nettacker.core.module.TemplateLoader")
|
|
def test_load_appends_port_to_existing_protocol(
|
|
mock_loader_cls,
|
|
mock_find_events,
|
|
mock_parse,
|
|
options,
|
|
module_args,
|
|
):
|
|
def loader_side_effect_specific(name, inputs):
|
|
mock_inst = MagicMock()
|
|
mock_inst.load.return_value = {
|
|
"payloads": [
|
|
{
|
|
"library": "http",
|
|
"steps": [
|
|
{"response": {"conditions": {"service": {}}}} # .load() requires no []
|
|
],
|
|
}
|
|
]
|
|
}
|
|
return mock_inst
|
|
|
|
mock_loader_cls.side_effect = loader_side_effect_specific
|
|
mock_find_events.return_value = [
|
|
MagicMock(
|
|
json_event=json.dumps({"port": 80, "response": {"conditions_results": {"http": {}}}})
|
|
),
|
|
MagicMock(
|
|
json_event=json.dumps({"port": 443, "response": {"conditions_results": {"http": {}}}})
|
|
),
|
|
]
|
|
|
|
module = Module("test_module", options, **module_args)
|
|
module.libraries = ["http"]
|
|
module.service_discovery_signatures = ["http"]
|
|
module.load()
|
|
assert module.discovered_services == {"http": [80, 443]}
|
|
|
|
|
|
@patch("nettacker.core.module.expand_module_steps")
|
|
@patch("nettacker.core.module.TemplateLoader")
|
|
def test_generate_loops_with_excluded_ports_and_ports_in_content(
|
|
mock_loader, mock_expand_steps, options, module_args
|
|
):
|
|
mock_instance = MagicMock()
|
|
return_value = {
|
|
"payloads": [
|
|
{
|
|
"steps": [
|
|
{
|
|
"ports": [80, 443, 8080],
|
|
"response": {"conditions": {"service": {}}},
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
mock_instance.load.return_value = return_value
|
|
mock_loader.return_value = mock_instance
|
|
|
|
mock_expand_steps.side_effect = lambda x: x
|
|
|
|
module = Module("test_module", options, **module_args)
|
|
module.module_inputs = {"excluded_ports": [443, 8080]}
|
|
module.module_content = return_value
|
|
module.generate_loops()
|
|
|
|
expected_ports = [80] # 443 and 8080 should be excluded
|
|
actual_ports = module.module_content["payloads"][0]["steps"][0]["ports"]
|
|
assert actual_ports == expected_ports
|
|
|
|
|
|
@patch("nettacker.core.module.expand_module_steps")
|
|
@patch("nettacker.core.module.TemplateLoader")
|
|
def test_generate_loops_with_excluded_ports_no_ports_in_content(
|
|
mock_loader, mock_expand_steps, options, module_args
|
|
):
|
|
mock_instance = MagicMock()
|
|
return_value = {
|
|
"payloads": [
|
|
{
|
|
"steps": [
|
|
{
|
|
"ports": [80, 443, 8080],
|
|
"response": {"conditions": {"service": {}}},
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
|
|
mock_instance.load.return_value = return_value
|
|
mock_loader.return_value = mock_instance
|
|
|
|
mock_expand_steps.side_effect = lambda x: x
|
|
|
|
module = Module("test_module", options, **module_args)
|
|
module.module_inputs = {"excluded_ports": None}
|
|
module.module_content = return_value
|
|
module.generate_loops()
|
|
|
|
expected_ports = [80, 443, 8080]
|
|
actual_ports = module.module_content["payloads"][0]["steps"][0]["ports"]
|
|
assert actual_ports == expected_ports
|