Compare commits

...

12 Commits

Author SHA1 Message Date
Achintya Jai ae06a003b6
Merge 19bbf8a838 into 65bf88e68f 2025-11-23 16:12:09 +01:00
Aarush 65bf88e68f
Fix #1147: Add SMB signature to port.yaml (#1162)
* Add regex for SMB protocol in port.yaml to fix the bug #1147

Signed-off-by: Aarush289 <cs24b064@smail.iitm.ac.in>

* smb_fix done

* Enable SMB regex match in port.yaml to fix the bug #1147

Signed-off-by: Aarush289 <cs24b064@smail.iitm.ac.in>

---------

Signed-off-by: Aarush289 <cs24b064@smail.iitm.ac.in>
Co-authored-by: Sam Stepanyan <sam.stepanyan@owasp.org>
2025-11-18 15:50:22 +00:00
Arkadii Yakovets 19bbf8a838
Merge branch 'master' into pr/pUrGe12/1095 2025-09-02 15:46:48 -07:00
Arkadii Yakovets e276a374c6
Merge branch 'master' into pr/pUrGe12/1095 2025-09-02 15:41:18 -07:00
pUrGe12 64dd5550d0 maybe patching find_events will help 2025-09-02 18:24:53 +05:30
pUrGe12 62a9a3bdc0 updated path structure inside config 2025-09-02 18:02:13 +05:30
pUrGe12 cb4891d582 rebased and bot suggestions 2025-09-02 17:49:24 +05:30
Arkadii Yakovets d693fc10f4
Update tests 2025-08-29 21:04:24 -07:00
Arkadii Yakovets 13b314c2af
Merge branch 'master' into pr/pUrGe12/1095 2025-08-29 21:00:12 -07:00
Sam Stepanyan 934f83558e
Merge branch 'master' into tests-module 2025-07-30 23:15:49 +01:00
pUrGe12 b2929c88e0 removing complexity from the threading test case 2025-07-04 01:53:10 +05:30
pUrGe12 e5046803dd test cases for module.py 2025-07-04 01:25:05 +05:30
2 changed files with 386 additions and 1 deletions

View File

@ -1092,4 +1092,8 @@ payloads:
amqp:
regex: "AMQP"
reverse: false
reverse: false
smb:
regex: "SMB\\d+|Microsoft Windows Network|Server\\sMessage\\sBlock\\sProtocol|\\d{{1,3}}\\.\\d{{1,3}}\\.\\d{{1,3}}\\.\\d{{1,3}}.*?SMB.*?|Session\\sError|Not\\simplemented|Protocol\\sViolation|\\d+\\sbytes\\sreceived|SMB\\sConnection\\sterminated|Session\\sestablished\\susing\\sSMB\\d+|NTLMv2|Negotiate Protocol|SMB2\\sProtocol\\sNegotiation|Session\\sSetup\\sSMB|Tree\\sConnect"
reverse: false

381
tests/core/test_module.py Normal file
View File

@ -0,0 +1,381 @@
import json
from unittest.mock import patch, MagicMock
import pytest
from nettacker.core.module import Module
class DummyOptions:
def __init__(self):
self.modules_extra_args = {"foo": "bar"}
self.skip_service_discovery = False
self.thread_per_host = 2
self.time_sleep_between_requests = 0
@pytest.fixture
def options():
return DummyOptions()
@pytest.fixture
def module_args():
return {
"process_number": 1,
"scan_id": "scan123",
"target": "127.0.0.1",
"thread_number": 1,
"total_number_threads": 1,
}
@patch("nettacker.core.module.TemplateLoader")
def test_init_and_service_discovery_signature(mock_loader, options, module_args):
mock_instance = MagicMock()
mock_instance.load.return_value = {
"payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}]
}
mock_loader.return_value = mock_instance
module = Module("port_scan", options, **module_args)
assert "http" in module.service_discovery_signatures
@patch("os.listdir", return_value=["http.py"])
@patch("nettacker.core.module.find_events")
@patch("nettacker.core.module.TemplateLoader")
def test_load_with_service_discovery(
mock_loader, mock_find_events, mock_listdir, options, module_args
):
mock_loader_inst = MagicMock()
mock_loader_inst.load.return_value = {
"payloads": [
{
"library": "http",
"steps": [{"response": {"conditions": {"service": {"http": {}}}}}],
}
]
}
mock_loader.return_value = mock_loader_inst
mock_find_events.return_value = [
MagicMock(json_event='{"port": 80, "response": {"conditions_results": {"http": {}}}}')
]
module = Module("test_module", options, **module_args)
module.load()
assert module.discovered_services == {"http": [80]}
assert len(module.module_content["payloads"]) == 1
@patch("nettacker.core.module.find_events")
@patch("nettacker.core.module.importlib.import_module")
@patch("nettacker.core.module.wait_for_threads_to_finish")
@patch("nettacker.core.module.time.sleep", return_value=None)
@patch("nettacker.core.module.Thread")
@patch("nettacker.core.module.TemplateLoader")
def test_start_all_conditions(
mock_loader,
mock_thread,
mock_sleep,
mock_wait,
mock_import,
mock_find_events,
options,
module_args,
):
engine_mock = MagicMock()
mock_import.return_value = MagicMock(HttpEngine=MagicMock(return_value=engine_mock))
mock_loader_inst = MagicMock()
mock_loader_inst.load.return_value = {
"payloads": [
{
"library": "http",
"steps": [{"step_id": 1, "response": {"conditions": {"service": {}}}}],
}
]
}
mock_loader.return_value = mock_loader_inst
mock_event = MagicMock()
mock_event.json_event = json.dumps(
{"port": 80, "response": {"conditions_results": {"http": True}}}
)
mock_find_events.return_value = [mock_event]
module = Module("test_module", options, **module_args)
module.libraries = ["http"]
module.load()
module.start()
mock_wait.assert_called()
@patch("nettacker.core.module.find_events")
@patch("nettacker.core.module.TemplateLoader")
def test_start_unsupported_library(mock_loader, mock_find_events, options, module_args):
mock_loader_inst = MagicMock()
mock_loader_inst.load.return_value = {
"payloads": [
{
"library": "unsupported_lib",
"steps": [{"step_id": 1, "response": {"conditions": {"service": {}}}}],
}
]
}
mock_loader.return_value = mock_loader_inst
mock_event = MagicMock()
mock_event.json_event = json.dumps(
{"port": 1234, "response": {"conditions_results": {"unsupported_lib": True}}}
)
mock_find_events.return_value = [mock_event]
module = Module("test_module", options, **module_args)
module.libraries = ["http"]
module.service_discovery_signatures.append("unsupported_lib")
module.load()
result = module.start()
assert result is None
def template_loader_side_effect(name, inputs):
# NOT A TEST CASE
mock_instance = MagicMock()
# as in inside Module.__init__
if name == "port_scan":
mock_instance.load.return_value = {
"payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}]
}
# as in module.load()
elif name == "test_module":
mock_instance.load.return_value = {
"payloads": [
{
"library": "http",
"steps": [
[{"response": {"conditions": {"service": {}}}}],
[
{
"response": {
"conditions": {},
"dependent_on_temp_event": True,
"save_to_temp_events_only": True,
}
}
],
[{"response": {"conditions": {}, "dependent_on_temp_event": True}}],
],
}
]
}
else:
raise ValueError(f"Unexpected module name: {name}")
return mock_instance
@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda step, _: step)
@patch("nettacker.core.module.find_events")
@patch("nettacker.core.module.TemplateLoader")
def test_sort_loops_behavior(mock_loader_cls, mock_find_events, mock_parse, options, module_args):
# This one is painful
mock_loader_cls.side_effect = template_loader_side_effect
mock_event = MagicMock()
mock_event.json_event = json.dumps(
{"port": 80, "response": {"conditions_results": {"http": True}}}
)
mock_find_events.return_value = [mock_event]
module = Module("test_module", options, **module_args)
module.libraries = ["http"]
module.load()
module.sort_loops()
steps = module.module_content["payloads"][0]["steps"]
assert steps[0][0]["response"]["conditions"] == {"service": {}}
assert steps[1][0]["response"]["dependent_on_temp_event"]
assert steps[1][0]["response"]["save_to_temp_events_only"]
assert steps[2][0]["response"]["dependent_on_temp_event"]
assert "save_to_temp_events_only" not in steps[2][0]["response"]
def loader_side_effect(name, inputs):
# HELPER
mock_inst = MagicMock()
if name == "port_scan":
mock_inst.load.return_value = {
"payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}]
}
elif name == "test_module":
mock_inst.load.return_value = {
"payloads": [
{
"library": "http",
"steps": [
[{"id": 1}],
[{"id": 2}],
],
}
]
}
else:
raise ValueError(f"Unexpected module name: {name}")
return mock_inst
@patch("nettacker.core.module.find_events", return_value=[])
@patch("nettacker.core.module.Thread")
@patch("nettacker.core.module.importlib.import_module")
@patch("nettacker.core.module.time.sleep", return_value=None)
@patch("nettacker.core.module.wait_for_threads_to_finish")
def test_start_creates_threads_minimal(
mock_wait,
mock_sleep,
mock_import_module,
mock_thread_cls,
mock_find_events,
options,
module_args,
):
# Mock HttpEngine from the imported module
fake_engine = MagicMock()
mock_import_module.return_value = MagicMock(HttpEngine=MagicMock(return_value=fake_engine))
# Mock thread instances
mock_thread_instance = MagicMock()
mock_thread_cls.return_value = mock_thread_instance
# Create module with mocked attributes
module = Module("test_module", options, **module_args)
module.libraries = ["http"]
module.discovered_services = {"http": [80]}
module.service_discovery_signatures = ["http"]
module.module_content = {
"payloads": [
{
"library": "http",
"steps": [
[{"response": {}, "id": 1}],
[{"response": {}, "id": 2}],
],
}
]
}
# Run
module.start()
# Assert threads created twice
assert mock_thread_cls.call_count == 2
# Collect actual IDs passed to Thread
expected_ids = {1, 2}
actual_ids = {kwargs["args"][0]["id"] for _, kwargs in mock_thread_cls.call_args_list}
assert actual_ids == expected_ids
assert mock_thread_instance.start.call_count == 2
@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda x, _: x)
@patch("nettacker.core.module.log")
@patch("nettacker.core.module.TemplateLoader")
@patch("nettacker.core.module.find_events")
def test_start_library_not_supported(
mock_find_events,
mock_loader_cls,
mock_log,
mock_parse,
module_args,
):
def loader_side_effect_specific(name, inputs):
mock_inst = MagicMock()
if name == "port_scan":
mock_inst.load.return_value = {
"payloads": [{"steps": [{"response": {"conditions": {"service": {"http": {}}}}}]}]
}
elif name == "test_module":
mock_inst.load.return_value = {
"payloads": [
{
"library": "unsupported_lib",
"steps": [
[{"id": 1}],
],
}
]
}
return mock_inst
mock_loader_cls.side_effect = loader_side_effect_specific
mock_event = MagicMock()
mock_event.json_event = json.dumps(
{"port": 80, "response": {"conditions_results": {"http": True}}}
)
mock_find_events.return_value = [mock_event]
options.modules_extra_args = {}
options.skip_service_discovery = True
module = Module("test_module", options, **module_args)
module.libraries = ["http"]
module.load()
result = module.start()
assert result is None
mock_log.warn.assert_called_once()
assert "unsupported_lib" in mock_log.warn.call_args[0][0]
@patch("nettacker.core.module.TemplateLoader.parse", side_effect=lambda step, _: step)
@patch("nettacker.core.module.find_events")
@patch("nettacker.core.module.TemplateLoader")
def test_load_appends_port_to_existing_protocol(
mock_loader_cls,
mock_find_events,
mock_parse,
options,
module_args,
):
def loader_side_effect_specific(name, inputs):
mock_inst = MagicMock()
mock_inst.load.return_value = {
"payloads": [
{
"library": "http",
"steps": [
{"response": {"conditions": {"service": {}}}} # .load() requires no []
],
}
]
}
return mock_inst
mock_loader_cls.side_effect = loader_side_effect_specific
mock_find_events.return_value = [
MagicMock(
json_event=json.dumps({"port": 80, "response": {"conditions_results": {"http": {}}}})
),
MagicMock(
json_event=json.dumps({"port": 443, "response": {"conditions_results": {"http": {}}}})
),
]
module = Module("test_module", options, **module_args)
module.libraries = ["http"]
module.service_discovery_signatures = ["http"]
module.load()
assert module.discovered_services == {"http": [80, 443]}