mirror of https://github.com/OWASP/Nettacker.git
Compare commits
12 Commits
d1c4921ec4
...
ae06a003b6
| Author | SHA1 | Date |
|---|---|---|
|
|
ae06a003b6 | |
|
|
65bf88e68f | |
|
|
19bbf8a838 | |
|
|
e276a374c6 | |
|
|
64dd5550d0 | |
|
|
62a9a3bdc0 | |
|
|
cb4891d582 | |
|
|
d693fc10f4 | |
|
|
13b314c2af | |
|
|
934f83558e | |
|
|
b2929c88e0 | |
|
|
e5046803dd |
|
|
@ -1092,4 +1092,8 @@ payloads:
|
|||
|
||||
amqp:
|
||||
regex: "AMQP"
|
||||
reverse: false
|
||||
reverse: false
|
||||
|
||||
smb:
|
||||
regex: "SMB\\d+|Microsoft Windows Network|Server\\sMessage\\sBlock\\sProtocol|\\d{{1,3}}\\.\\d{{1,3}}\\.\\d{{1,3}}\\.\\d{{1,3}}.*?SMB.*?|Session\\sError|Not\\simplemented|Protocol\\sViolation|\\d+\\sbytes\\sreceived|SMB\\sConnection\\sterminated|Session\\sestablished\\susing\\sSMB\\d+|NTLMv2|Negotiate Protocol|SMB2\\sProtocol\\sNegotiation|Session\\sSetup\\sSMB|Tree\\sConnect"
|
||||
reverse: false
|
||||
|
|
|
|||
|
|
@ -0,0 +1,381 @@
|
|||
import json
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from nettacker.core.module import Module
|
||||
|
||||
|
||||
class DummyOptions:
|
||||
def __init__(self):
|
||||
self.modules_extra_args = {"foo": "bar"}
|
||||
self.skip_service_discovery = False
|
||||
self.thread_per_host = 2
|
||||
self.time_sleep_between_requests = 0
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def options():
|
||||
return DummyOptions()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def module_args():
|
||||
return {
|
||||
"process_number": 1,
|
||||
"scan_id": "scan123",
|
||||
"target": "127.0.0.1",
|
||||
"thread_number": 1,
|
||||
"total_number_threads": 1,
|
||||
}
|
||||
|
||||
|
||||
@patch("nettacker.core.module.TemplateLoader")
|
||||
def test_init_and_service_discovery_signature(mock_loader, options, module_args):
|
||||
mock_instance = MagicMock()
|
||||
mock_instance.load.return_value = {
|
||||
"payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}]
|
||||
}
|
||||
mock_loader.return_value = mock_instance
|
||||
|
||||
module = Module("port_scan", options, **module_args)
|
||||
assert "http" in module.service_discovery_signatures
|
||||
|
||||
|
||||
@patch("os.listdir", return_value=["http.py"])
|
||||
@patch("nettacker.core.module.find_events")
|
||||
@patch("nettacker.core.module.TemplateLoader")
|
||||
def test_load_with_service_discovery(
|
||||
mock_loader, mock_find_events, mock_listdir, options, module_args
|
||||
):
|
||||
mock_loader_inst = MagicMock()
|
||||
mock_loader_inst.load.return_value = {
|
||||
"payloads": [
|
||||
{
|
||||
"library": "http",
|
||||
"steps": [{"response": {"conditions": {"service": {"http": {}}}}}],
|
||||
}
|
||||
]
|
||||
}
|
||||
mock_loader.return_value = mock_loader_inst
|
||||
|
||||
mock_find_events.return_value = [
|
||||
MagicMock(json_event='{"port": 80, "response": {"conditions_results": {"http": {}}}}')
|
||||
]
|
||||
|
||||
module = Module("test_module", options, **module_args)
|
||||
module.load()
|
||||
|
||||
assert module.discovered_services == {"http": [80]}
|
||||
assert len(module.module_content["payloads"]) == 1
|
||||
|
||||
|
||||
@patch("nettacker.core.module.find_events")
|
||||
@patch("nettacker.core.module.importlib.import_module")
|
||||
@patch("nettacker.core.module.wait_for_threads_to_finish")
|
||||
@patch("nettacker.core.module.time.sleep", return_value=None)
|
||||
@patch("nettacker.core.module.Thread")
|
||||
@patch("nettacker.core.module.TemplateLoader")
|
||||
def test_start_all_conditions(
|
||||
mock_loader,
|
||||
mock_thread,
|
||||
mock_sleep,
|
||||
mock_wait,
|
||||
mock_import,
|
||||
mock_find_events,
|
||||
options,
|
||||
module_args,
|
||||
):
|
||||
engine_mock = MagicMock()
|
||||
mock_import.return_value = MagicMock(HttpEngine=MagicMock(return_value=engine_mock))
|
||||
|
||||
mock_loader_inst = MagicMock()
|
||||
mock_loader_inst.load.return_value = {
|
||||
"payloads": [
|
||||
{
|
||||
"library": "http",
|
||||
"steps": [{"step_id": 1, "response": {"conditions": {"service": {}}}}],
|
||||
}
|
||||
]
|
||||
}
|
||||
mock_loader.return_value = mock_loader_inst
|
||||
|
||||
mock_event = MagicMock()
|
||||
mock_event.json_event = json.dumps(
|
||||
{"port": 80, "response": {"conditions_results": {"http": True}}}
|
||||
)
|
||||
mock_find_events.return_value = [mock_event]
|
||||
|
||||
module = Module("test_module", options, **module_args)
|
||||
module.libraries = ["http"]
|
||||
module.load()
|
||||
module.start()
|
||||
|
||||
mock_wait.assert_called()
|
||||
|
||||
|
||||
@patch("nettacker.core.module.find_events")
|
||||
@patch("nettacker.core.module.TemplateLoader")
|
||||
def test_start_unsupported_library(mock_loader, mock_find_events, options, module_args):
|
||||
mock_loader_inst = MagicMock()
|
||||
mock_loader_inst.load.return_value = {
|
||||
"payloads": [
|
||||
{
|
||||
"library": "unsupported_lib",
|
||||
"steps": [{"step_id": 1, "response": {"conditions": {"service": {}}}}],
|
||||
}
|
||||
]
|
||||
}
|
||||
mock_loader.return_value = mock_loader_inst
|
||||
|
||||
mock_event = MagicMock()
|
||||
mock_event.json_event = json.dumps(
|
||||
{"port": 1234, "response": {"conditions_results": {"unsupported_lib": True}}}
|
||||
)
|
||||
mock_find_events.return_value = [mock_event]
|
||||
|
||||
module = Module("test_module", options, **module_args)
|
||||
module.libraries = ["http"]
|
||||
module.service_discovery_signatures.append("unsupported_lib")
|
||||
|
||||
module.load()
|
||||
result = module.start()
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def template_loader_side_effect(name, inputs):
|
||||
# NOT A TEST CASE
|
||||
mock_instance = MagicMock()
|
||||
|
||||
# as in inside Module.__init__
|
||||
if name == "port_scan":
|
||||
mock_instance.load.return_value = {
|
||||
"payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}]
|
||||
}
|
||||
# as in module.load()
|
||||
elif name == "test_module":
|
||||
mock_instance.load.return_value = {
|
||||
"payloads": [
|
||||
{
|
||||
"library": "http",
|
||||
"steps": [
|
||||
[{"response": {"conditions": {"service": {}}}}],
|
||||
[
|
||||
{
|
||||
"response": {
|
||||
"conditions": {},
|
||||
"dependent_on_temp_event": True,
|
||||
"save_to_temp_events_only": True,
|
||||
}
|
||||
}
|
||||
],
|
||||
[{"response": {"conditions": {}, "dependent_on_temp_event": True}}],
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
else:
|
||||
raise ValueError(f"Unexpected module name: {name}")
|
||||
|
||||
return mock_instance
|
||||
|
||||
|
||||
@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda step, _: step)
|
||||
@patch("nettacker.core.module.find_events")
|
||||
@patch("nettacker.core.module.TemplateLoader")
|
||||
def test_sort_loops_behavior(mock_loader_cls, mock_find_events, mock_parse, options, module_args):
|
||||
# This one is painful
|
||||
mock_loader_cls.side_effect = template_loader_side_effect
|
||||
|
||||
mock_event = MagicMock()
|
||||
mock_event.json_event = json.dumps(
|
||||
{"port": 80, "response": {"conditions_results": {"http": True}}}
|
||||
)
|
||||
mock_find_events.return_value = [mock_event]
|
||||
|
||||
module = Module("test_module", options, **module_args)
|
||||
module.libraries = ["http"]
|
||||
module.load()
|
||||
module.sort_loops()
|
||||
|
||||
steps = module.module_content["payloads"][0]["steps"]
|
||||
|
||||
assert steps[0][0]["response"]["conditions"] == {"service": {}}
|
||||
assert steps[1][0]["response"]["dependent_on_temp_event"]
|
||||
assert steps[1][0]["response"]["save_to_temp_events_only"]
|
||||
assert steps[2][0]["response"]["dependent_on_temp_event"]
|
||||
assert "save_to_temp_events_only" not in steps[2][0]["response"]
|
||||
|
||||
|
||||
def loader_side_effect(name, inputs):
|
||||
# HELPER
|
||||
mock_inst = MagicMock()
|
||||
|
||||
if name == "port_scan":
|
||||
mock_inst.load.return_value = {
|
||||
"payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}]
|
||||
}
|
||||
elif name == "test_module":
|
||||
mock_inst.load.return_value = {
|
||||
"payloads": [
|
||||
{
|
||||
"library": "http",
|
||||
"steps": [
|
||||
[{"id": 1}],
|
||||
[{"id": 2}],
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
else:
|
||||
raise ValueError(f"Unexpected module name: {name}")
|
||||
|
||||
return mock_inst
|
||||
|
||||
|
||||
@patch("nettacker.core.module.find_events", return_value=[])
|
||||
@patch("nettacker.core.module.Thread")
|
||||
@patch("nettacker.core.module.importlib.import_module")
|
||||
@patch("nettacker.core.module.time.sleep", return_value=None)
|
||||
@patch("nettacker.core.module.wait_for_threads_to_finish")
|
||||
def test_start_creates_threads_minimal(
|
||||
mock_wait,
|
||||
mock_sleep,
|
||||
mock_import_module,
|
||||
mock_thread_cls,
|
||||
mock_find_events,
|
||||
options,
|
||||
module_args,
|
||||
):
|
||||
# Mock HttpEngine from the imported module
|
||||
fake_engine = MagicMock()
|
||||
mock_import_module.return_value = MagicMock(HttpEngine=MagicMock(return_value=fake_engine))
|
||||
|
||||
# Mock thread instances
|
||||
mock_thread_instance = MagicMock()
|
||||
mock_thread_cls.return_value = mock_thread_instance
|
||||
|
||||
# Create module with mocked attributes
|
||||
module = Module("test_module", options, **module_args)
|
||||
module.libraries = ["http"]
|
||||
module.discovered_services = {"http": [80]}
|
||||
module.service_discovery_signatures = ["http"]
|
||||
|
||||
module.module_content = {
|
||||
"payloads": [
|
||||
{
|
||||
"library": "http",
|
||||
"steps": [
|
||||
[{"response": {}, "id": 1}],
|
||||
[{"response": {}, "id": 2}],
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
# Run
|
||||
module.start()
|
||||
|
||||
# Assert threads created twice
|
||||
assert mock_thread_cls.call_count == 2
|
||||
|
||||
# Collect actual IDs passed to Thread
|
||||
expected_ids = {1, 2}
|
||||
actual_ids = {kwargs["args"][0]["id"] for _, kwargs in mock_thread_cls.call_args_list}
|
||||
|
||||
assert actual_ids == expected_ids
|
||||
assert mock_thread_instance.start.call_count == 2
|
||||
|
||||
|
||||
@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda x, _: x)
|
||||
@patch("nettacker.core.module.log")
|
||||
@patch("nettacker.core.module.TemplateLoader")
|
||||
@patch("nettacker.core.module.find_events")
|
||||
def test_start_library_not_supported(
|
||||
mock_find_events,
|
||||
mock_loader_cls,
|
||||
mock_log,
|
||||
mock_parse,
|
||||
module_args,
|
||||
):
|
||||
def loader_side_effect_specific(name, inputs):
|
||||
mock_inst = MagicMock()
|
||||
if name == "port_scan":
|
||||
mock_inst.load.return_value = {
|
||||
"payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}]
|
||||
}
|
||||
elif name == "test_module":
|
||||
mock_inst.load.return_value = {
|
||||
"payloads": [
|
||||
{
|
||||
"library": "unsupported_lib",
|
||||
"steps": [
|
||||
[{"id": 1}],
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
return mock_inst
|
||||
|
||||
mock_loader_cls.side_effect = loader_side_effect_specific
|
||||
|
||||
mock_event = MagicMock()
|
||||
mock_event.json_event = json.dumps(
|
||||
{"port": 80, "response": {"conditions_results": {"http": True}}}
|
||||
)
|
||||
mock_find_events.return_value = [mock_event]
|
||||
|
||||
options.modules_extra_args = {}
|
||||
options.skip_service_discovery = True
|
||||
|
||||
module = Module("test_module", options, **module_args)
|
||||
module.libraries = ["http"]
|
||||
module.load()
|
||||
|
||||
result = module.start()
|
||||
|
||||
assert result is None
|
||||
mock_log.warn.assert_called_once()
|
||||
assert "unsupported_lib" in mock_log.warn.call_args[0][0]
|
||||
|
||||
|
||||
@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda step, _: step)
|
||||
@patch("nettacker.core.module.find_events")
|
||||
@patch("nettacker.core.module.TemplateLoader")
|
||||
def test_load_appends_port_to_existing_protocol(
|
||||
mock_loader_cls,
|
||||
mock_find_events,
|
||||
mock_parse,
|
||||
options,
|
||||
module_args,
|
||||
):
|
||||
def loader_side_effect_specific(name, inputs):
|
||||
mock_inst = MagicMock()
|
||||
mock_inst.load.return_value = {
|
||||
"payloads": [
|
||||
{
|
||||
"library": "http",
|
||||
"steps": [
|
||||
{"response": {"conditions": {"service": {}}}} # .load() requires no []
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
return mock_inst
|
||||
|
||||
mock_loader_cls.side_effect = loader_side_effect_specific
|
||||
mock_find_events.return_value = [
|
||||
MagicMock(
|
||||
json_event=json.dumps({"port": 80, "response": {"conditions_results": {"http": {}}}})
|
||||
),
|
||||
MagicMock(
|
||||
json_event=json.dumps({"port": 443, "response": {"conditions_results": {"http": {}}}})
|
||||
),
|
||||
]
|
||||
|
||||
module = Module("test_module", options, **module_args)
|
||||
module.libraries = ["http"]
|
||||
module.service_discovery_signatures = ["http"]
|
||||
module.load()
|
||||
assert module.discovered_services == {"http": [80, 443]}
|
||||
Loading…
Reference in New Issue