diff --git a/docs/Usage.md b/docs/Usage.md index 2896e214..d140cef5 100644 --- a/docs/Usage.md +++ b/docs/Usage.md @@ -37,7 +37,7 @@ usage: Nettacker [-L LANGUAGE] [-v] [--verbose-event] [-V] [-o REPORT_PATH_FILEN [-i TARGETS] [-l TARGETS_LIST] [-m SELECTED_MODULES] [--modules-extra-args MODULES_EXTRA_ARGS] [--show-all-modules] [--profile PROFILES] [--show-all-profiles] [-x EXCLUDED_MODULES] [-u USERNAMES] [-U USERNAMES_LIST] [-p PASSWORDS] [-P PASSWORDS_LIST] [-g PORTS] [--user-agent USER_AGENT] - [-T TIMEOUT] [-w TIME_SLEEP_BETWEEN_REQUESTS] [-r] [-s] [-d] [-t THREAD_PER_HOST] + [-T TIMEOUT] [-w TIME_SLEEP_BETWEEN_REQUESTS] [-r] [-s] [-d] [-t THREAD_PER_HOST] [-H HTTP_HEADER] [-M PARALLEL_MODULE_SCAN] [--set-hardware-usage SET_HARDWARE_USAGE] [-R SOCKS_PROXY] [--retries RETRIES] [--ping-before-scan] [-K SCAN_COMPARE_ID] [-J COMPARE_REPORT_PATH_FILENAME] [--start-api] [--api-host API_HOSTNAME] [--api-port API_PORT] [--api-debug-mode] @@ -124,6 +124,9 @@ Method: compare current scan to old scans using the unique scan_id -J COMPARE_REPORT_PATH_FILENAME, --compare-report-path COMPARE_REPORT_PATH_FILENAME the file-path to store the compare_scan report + -H HTTP_HEADER, --add-http-header HTTP_HEADER + Add custom HTTP headers to requests (format: 'key: value'). For multiple headers, use multiple -H flags + API: API options @@ -340,6 +343,12 @@ python nettacker.py -i -m f5_cve_2020_5902 -s python nettacker.py -i owasp.org -s -m port_scan -t 10 -M 35 -g 20-100 --graph d3_tree_v2_graph ``` +* Using the `-H` command you can add your own HTTP headers to requests (useful for authentication) and chain it using multiple `-H` commands + +``` +python nettacker.py -i owasp.org -s -m http_status_scan -H "Authorization: Basic abcd" -H "Content-Type: abcd" -t 100 -d +``` + * If you use `-r` command, it will scan the IP range automatically by getting the range from the RIPE database online. ``` python nettacker.py -i owasp.org -s -r -m port_scan -t 10 -M 35 -g 20-100 --graph d3_tree_v2_graph @@ -488,6 +497,8 @@ def nettacker_user_application_config(): "graph_name": "d3_tree_v2_graph", "show_help_menu": False, "targets": None, + "url_base_path": None, + "http_header": None, "targets_list": None, "selected_modules": None, "excluded_modules": None, diff --git a/nettacker/config.py b/nettacker/config.py index 0801a4aa..450bcd20 100644 --- a/nettacker/config.py +++ b/nettacker/config.py @@ -40,6 +40,25 @@ class ConfigBase: yield from self.attributes +# Some sensitive header fields for HTTP requests. +# Please edit this if you don't want your HTTP header to be present in the logs +sensitive_headers = { + "authorization", + "proxy-authorization", + "cookie", + "set-cookie", + "x-api-key", + "x-amz-security-token", + "x-amz-credential", + "x-amz-signature", + "x-session-id", + "x-csrf-token", + "x-auth-token", + "x-user-token", + "x-id-token", +} + + class ApiConfig(ConfigBase): """OWASP Nettacker API Default Configuration""" @@ -134,6 +153,7 @@ class DefaultSettings(ConfigBase): scan_subdomains = False selected_modules = None url_base_path = None + http_header = None read_from_file = "" set_hardware_usage = "maximum" # low, normal, high, maximum show_all_modules = False diff --git a/nettacker/core/arg_parser.py b/nettacker/core/arg_parser.py index f27f12fe..0fa42679 100644 --- a/nettacker/core/arg_parser.py +++ b/nettacker/core/arg_parser.py @@ -415,6 +415,14 @@ class ArgParser(ArgumentParser): dest="read_from_file", help=_("user_wordlist"), ) + method_options.add_argument( + "-H", + "--add-http-header", + action="append", + default=Config.settings.http_header, + dest="http_header", + help=_("http_header"), + ) # API Options api_options = self.add_argument_group(_("API"), _("API_options")) diff --git a/nettacker/core/lib/base.py b/nettacker/core/lib/base.py index 98573bf2..110e51dc 100644 --- a/nettacker/core/lib/base.py +++ b/nettacker/core/lib/base.py @@ -9,7 +9,7 @@ import yaml from nettacker.config import Config from nettacker.core.messages import messages as _ -from nettacker.core.utils.common import merge_logs_to_list +from nettacker.core.utils.common import merge_logs_to_list, remove_sensitive_header_keys from nettacker.database.db import find_temp_events, submit_temp_logs_to_db, submit_logs_to_db from nettacker.logger import get_logger, TerminalCodes @@ -120,6 +120,8 @@ class BaseEngine(ABC): request_number_counter, total_number_of_requests, ): + # Remove sensitive keys from headers before submitting to DB + event = remove_sensitive_header_keys(event) if "save_to_temp_events_only" in event.get("response", ""): submit_temp_logs_to_db( { diff --git a/nettacker/core/lib/http.py b/nettacker/core/lib/http.py index dfac3ef5..a9005e42 100644 --- a/nettacker/core/lib/http.py +++ b/nettacker/core/lib/http.py @@ -10,7 +10,12 @@ import aiohttp import uvloop from nettacker.core.lib.base import BaseEngine -from nettacker.core.utils.common import replace_dependent_response, reverse_and_regex_condition +from nettacker.core.utils.common import ( + replace_dependent_response, + reverse_and_regex_condition, + get_http_header_key, + get_http_header_value, +) asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) @@ -144,6 +149,14 @@ class HttpEngine(BaseEngine): request_number_counter, total_number_of_requests, ): + if options["http_header"] is not None: + for header in options["http_header"]: + key = get_http_header_key(header).strip() + value = get_http_header_value(header) + if value is not None: + sub_step["headers"][key] = value.strip() + else: + sub_step["headers"][key] = "" backup_method = copy.deepcopy(sub_step["method"]) backup_response = copy.deepcopy(sub_step["response"]) backup_iterative_response_match = copy.deepcopy( diff --git a/nettacker/core/utils/common.py b/nettacker/core/utils/common.py index 10778373..6418bbb9 100644 --- a/nettacker/core/utils/common.py +++ b/nettacker/core/utils/common.py @@ -98,6 +98,65 @@ def terminate_thread(thread, verbose=True): return True +def get_http_header_key(http_header): + """ + Return the HTTP header key based on the full string entered by the user + Args: + http_header: a string entered by the user following the -H flag + Returns: + 1. The HTTP header key if http_header is a key-value pair + 2. The http_header itself if http_header is NOT a key_value pair (i.e. http_header is a plain string) + 3. An empty string if http_header is empty + Example: + http_header: "Authorization: Bearer abcdefgh" + Returns -> "Authorization" + """ + # Split only at the first ":" + return http_header.split(":", 1)[0].strip() + + +def get_http_header_value(http_header): + """ + Return the HTTP header value based on the full string entered by the user + Args: + http_header: a string entered by the user following the -H flag + Returns: + 1. The HTTP header value if http_header is a key-value pair + 2. None if the http_header is empty or NOT a key-value pair + Example: + http_header: "Authorization: Bearer abcdefgh" + Returns -> "Bearer abcdefgh" + """ + if not http_header or ":" not in http_header: + return None + # Split only at the first ":" + value = http_header.split(":", 1)[1].strip() + return value if value else None + + +def remove_sensitive_header_keys(event): + """ + Removes the sensitive headers that the user might add + Args: + event: The json event which contains the headers + Returns: + event: The json event without the sensitive headers + """ + from nettacker.config import sensitive_headers + + if not isinstance(event, dict): + return event + + if "headers" in event: + if not isinstance(event["headers"], dict): + return event + for key in list(event["headers"].keys()): + if key.lower() in sensitive_headers: + del event["headers"][key] + + return event + + def find_args_value(args_name): try: return sys.argv[sys.argv.index(args_name) + 1] diff --git a/nettacker/locale/en.yaml b/nettacker/locale/en.yaml index c359a172..0c3ab0ba 100644 --- a/nettacker/locale/en.yaml +++ b/nettacker/locale/en.yaml @@ -126,3 +126,4 @@ compare_report_saved: "compare results saved in {0}" build_compare_report: "building compare report" finish_build_report: "Finished building compare report" user_wordlist: "Allows users to enter their own wordlist" +http_header: "Add custom HTTP headers to requests (format: 'key: value'). For multiple headers, use multiple -H flags" diff --git a/poetry.lock b/poetry.lock index 2d769723..cc651197 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -218,6 +218,19 @@ docs = ["cogapp", "furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphi tests = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.9\"", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.9\" and python_version < \"3.13\"", "pytest-xdist[psutil]"] tests-mypy = ["mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.9\"", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.9\" and python_version < \"3.13\""] +[[package]] +name = "backports-asyncio-runner" +version = "1.2.0" +description = "Backport of asyncio.Runner, a context manager that controls event loop life cycle." +optional = false +python-versions = "<3.11,>=3.8" +groups = ["dev"] +markers = "python_version < \"3.11\"" +files = [ + {file = "backports_asyncio_runner-1.2.0-py3-none-any.whl", hash = "sha256:0da0a936a8aeb554eccb426dc55af3ba63bcdc69fa1a600b5bb305413a4477b5"}, + {file = "backports_asyncio_runner-1.2.0.tar.gz", hash = "sha256:a5aa7b2b7d8f8bfcaa2b57313f70792df84e32a2a746f585213373f900b42162"}, +] + [[package]] name = "bcrypt" version = "4.2.0" @@ -959,7 +972,7 @@ description = "Read metadata from Python packages" optional = false python-versions = ">=3.8" groups = ["main"] -markers = "python_version < \"3.10\"" +markers = "python_version == \"3.9\"" files = [ {file = "importlib_metadata-8.4.0-py3-none-any.whl", hash = "sha256:66f342cc6ac9818fc6ff340576acd24d65ba0b3efabb2b4ac08b598965a4a2f1"}, {file = "importlib_metadata-8.4.0.tar.gz", hash = "sha256:9a547d3bc3608b025f93d403fdd1aae741c24fbb8314df4b155675742ce303c5"}, @@ -979,7 +992,7 @@ version = "2.0.0" description = "brain-dead simple config-ini parsing" optional = false python-versions = ">=3.7" -groups = ["test"] +groups = ["dev", "test"] files = [ {file = "iniconfig-2.0.0-py3-none-any.whl", hash = "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"}, {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, @@ -1347,7 +1360,7 @@ version = "24.1" description = "Core utilities for Python packages" optional = false python-versions = ">=3.8" -groups = ["test"] +groups = ["dev", "test"] files = [ {file = "packaging-24.1-py3-none-any.whl", hash = "sha256:5b8f2217dbdbd2f7f384c41c628544e6d52f2d0f53c6d0c3ea61aa5d1d7ff124"}, {file = "packaging-24.1.tar.gz", hash = "sha256:026ed72c8ed3fcce5bf8950572258698927fd1dbda10a5e981cdf0ac37f4f002"}, @@ -1413,7 +1426,7 @@ version = "1.5.0" description = "plugin and hook calling mechanisms for python" optional = false python-versions = ">=3.8" -groups = ["test"] +groups = ["dev", "test"] files = [ {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, @@ -1637,7 +1650,7 @@ version = "8.3.3" description = "pytest: simple powerful testing with Python" optional = false python-versions = ">=3.8" -groups = ["test"] +groups = ["dev", "test"] files = [ {file = "pytest-8.3.3-py3-none-any.whl", hash = "sha256:a6853c7375b2663155079443d2e45de913a911a11d669df02a50814944db57b2"}, {file = "pytest-8.3.3.tar.gz", hash = "sha256:70b98107bd648308a7952b06e6ca9a50bc660be218d53c257cc1fc94fda10181"}, @@ -1654,6 +1667,27 @@ tomli = {version = ">=1", markers = "python_version < \"3.11\""} [package.extras] dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] +[[package]] +name = "pytest-asyncio" +version = "1.1.0" +description = "Pytest support for asyncio" +optional = false +python-versions = ">=3.9" +groups = ["dev"] +files = [ + {file = "pytest_asyncio-1.1.0-py3-none-any.whl", hash = "sha256:5fe2d69607b0bd75c656d1211f969cadba035030156745ee09e7d71740e58ecf"}, + {file = "pytest_asyncio-1.1.0.tar.gz", hash = "sha256:796aa822981e01b68c12e4827b8697108f7205020f24b5793b3c41555dab68ea"}, +] + +[package.dependencies] +backports-asyncio-runner = {version = ">=1.1,<2", markers = "python_version < \"3.11\""} +pytest = ">=8.2,<9" +typing-extensions = {version = ">=4.12", markers = "python_version < \"3.10\""} + +[package.extras] +docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1)"] +testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"] + [[package]] name = "pytest-cov" version = "6.0.0" @@ -1953,12 +1987,12 @@ version = "2.0.1" description = "A lil' TOML parser" optional = false python-versions = ">=3.7" -groups = ["test"] -markers = "python_full_version <= \"3.11.0a6\"" +groups = ["dev", "test"] files = [ {file = "tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"}, {file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"}, ] +markers = {dev = "python_version < \"3.11\"", test = "python_full_version <= \"3.11.0a6\""} [[package]] name = "traitlets" @@ -1987,7 +2021,7 @@ files = [ {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"}, {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, ] -markers = {dev = "python_version < \"3.10\""} +markers = {dev = "python_version == \"3.9\""} [[package]] name = "urllib3" @@ -2218,4 +2252,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.9, <3.13" -content-hash = "2b386c255454b238be477075918868cec7220c7dd39fadfe94adbd3d2bb96949" +content-hash = "0377e9d8f656d26bacf2ec6faa811d7172263b595430dc8ce21c5911538d3ed3" diff --git a/pyproject.toml b/pyproject.toml index 743d40f0..a2222588 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,6 +69,7 @@ impacket = "^0.11.0" [tool.poetry.group.dev.dependencies] ipython = "^8.16.1" ruff = ">=0.2.1,<0.8.0" +pytest-asyncio = "^1.1.0" [tool.poetry.group.test.dependencies] coverage = "^7.3.2" diff --git a/tests/core/lib/test_user_added_http_headers.py b/tests/core/lib/test_user_added_http_headers.py new file mode 100644 index 00000000..2a08f8d8 --- /dev/null +++ b/tests/core/lib/test_user_added_http_headers.py @@ -0,0 +1,500 @@ +import asyncio +import importlib +import time + +import pytest + +http = importlib.import_module("nettacker.core.lib.http") + +# ---------------------------- +# Helpers / Fakes +# ---------------------------- + + +class FakeContent: + def __init__(self, data: bytes): + self._data = data + + async def read(self): + return self._data + + +class FakeResponse: + def __init__( + self, *, reason="OK", url="http://example.com", status=200, headers=None, body=b"body" + ): + self.reason = reason + self.url = url + self.status = status + self.headers = headers or {"X-Test": "yes"} + self.content = FakeContent(body) + + +class FakeCtx: + """ + Mimic aiohttp's _RequestContextManager: awaitable + async context manager. + """ + + def __init__(self, response: FakeResponse): + self._response = response + + def __await__(self): + async def _inner(): + return self._response + + return _inner().__await__() + + async def __aenter__(self): + return self._response + + async def __aexit__(self, exc_type, exc, tb): + return False + + +class FakeSession: + """ + Mimic aiohttp.ClientSession with a single method (get/post/etc.) returning FakeCtx. + """ + + def __init__(self, method_response_map): + self._method_response_map = method_response_map + + def __getattr__(self, name): + if name in self._method_response_map: + + def _caller(**kwargs): + return FakeCtx(self._method_response_map[name]) + + return _caller + raise AttributeError(name) + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return False + + +class DummyEngine(http.HttpEngine): + """ + Override BaseEngine interactions to isolate run(). + """ + + def __init__(self): + pass + + def get_dependent_results_from_database(self, *args, **kwargs): + return {"token": "ABC123"} + + def replace_dependent_values(self, sub_step, temp_event): + sub_step = dict(sub_step) + sub_step["url"] = sub_step["url"].replace("{{token}}", temp_event["token"]) + return sub_step + + def process_conditions( + self, + sub_step, + module_name, + target, + scan_id, + options, + response, + process_number, + module_thread_number, + total_module_thread_number, + request_number_counter, + total_number_of_requests, + ): + # Make it observable + return { + "processed": True, + "sub_step": sub_step, + "response": response, + } + + +# ---------------------------- +# Tests for perform_request_action / send_request +# ---------------------------- + + +def test_perform_request_action_happy_path(monkeypatch): + # Freeze time to make responsetime predictable + times = [1000.0, 1001.0] # start, end + monkeypatch.setattr(time, "time", lambda: times.pop(0)) + + response = FakeResponse( + reason="Created", + url="http://example.com/hello", + status=201, + headers={"Content-Type": "text/plain"}, + body=b"hello", + ) + + async def run(): + return await http.perform_request_action( + lambda **_: FakeCtx(response), {"url": "http://example.com/hello"} + ) + + result = asyncio.run(run()) + assert result["reason"] == "Created" + assert result["url"] == "http://example.com/hello" + assert result["status_code"] == "201" + assert result["headers"]["Content-Type"] == "text/plain" + assert result["content"] == b"hello" + assert 0.99 <= result["responsetime"] <= 1.01 # ~1 second + + +@pytest.mark.asyncio +async def test_send_request_uses_session_and_method(monkeypatch): + fake_resp = FakeResponse(status=202, body=b"accepted") + fake_session = FakeSession({"get": fake_resp}) + + class _FakeClientSession: + def __init__(self, *a, **k): + self._sess = fake_session + + async def __aenter__(self): + return self._sess + + async def __aexit__(self, exc_type, exc, tb): + return False + + monkeypatch.setattr(http.aiohttp, "ClientSession", _FakeClientSession) + res = await http.send_request({"url": "http://example.com"}, "get") + assert res["status_code"] == "202" + assert res["content"] == b"accepted" + + +# ---------------------------- +# Tests for response_conditions_matched +# ---------------------------- + + +def test_response_conditions_matched_status_code_match(): + sub_step = { + "timeout": 3.0, + "headers": {"User-Agent": "Nettacker 0.4.0 QUIN"}, + "allow_redirects": False, + "ssl": False, + "url": "http://owasp.org:443", + "method": "get", + "response": { + "condition_type": "and", + "log": "response_dependent['status_code']", + "conditions": { + "status_code": {"regex": r"^200$", "reverse": False}, + "url": {"regex": r"owasp\.org", "reverse": False}, + "reason": {"regex": r"OK", "reverse": False}, + }, + }, + } + response = { + "status_code": "200", + "url": "http://owasp.org:443", + "reason": "OK", + "headers": {}, + "responsetime": 0.1, + "content": "body", + } + out = http.response_conditions_matched(sub_step, response) + assert out != {} + assert "log" in out # log field should be present + + +def test_response_conditions_or_typical_response_match(): + sub_step = { + "response": { + "condition_type": "or", + "log": "response_dependent['status_code']", + "conditions": { + "status_code": {"regex": r"\d\d\d", "reverse": False}, + "reason": {"regex": r"OK", "reverse": False}, + }, + } + } + response = { + "reason": "Moved Permanently", + "url": "http://owasp.org", + "status_code": "301", + "content": "

301 Moved Permanently

", + "headers": { + "Date": "Sat, 26 Jul 2025 09:28:43 GMT", + "Content-Type": "text/html", + "Content-Length": "167", + "Server": "cloudflare", + }, + "responsetime": 0.27, + } + + out = http.response_conditions_matched(sub_step, response) + assert out != {} + assert "log" in out + assert out["log"] == "301" + + +def test_response_conditions_or_typical_response_no_match(): + sub_step = { + "response": { + "condition_type": "or", + "log": "response_dependent['status_code']", + "conditions": { + "status_code": {"regex": r"404", "reverse": False}, + "reason": {"regex": r"OK", "reverse": False}, + }, + } + } + response = { + "reason": "Moved Permanently", + "url": "http://owasp.org", + "status_code": "301", # Does not match 404 + "content": "

301 Moved Permanently

", + "headers": { + "Date": "Sat, 26 Jul 2025 09:28:43 GMT", + "Content-Type": "text/html", + "Content-Length": "167", + "Server": "cloudflare", + }, + "responsetime": 0.27, + } + + out = http.response_conditions_matched(sub_step, response) + assert out == {}, "Expected empty result since no condition matched" + + +def test_response_conditions_headers_case_insensitive(): + sub_step = { + "timeout": 3.0, + "headers": {"User-Agent": "Nettacker 0.4.0 QUIN"}, + "allow_redirects": False, + "ssl": False, + "url": "http://owasp.org:443", + "method": "get", + "response": { + "condition_type": "and", + "log": "response_dependent['status_code']", + "conditions": { + "headers": { + "Content-Type": {"regex": r"text/html", "reverse": False}, + } + }, + }, + } + response = { + "status_code": "200", + "url": "http://owasp.org:443", + "reason": "OK", + "headers": { + "content-type": "text/html", # Lowercase key to test case-insensitivity + }, + "responsetime": 0.05, + "content": "body", + } + out = http.response_conditions_matched(sub_step, response) + assert out != {} + assert out["headers"]["Content-Type"] == ["text/html"] + + +def test_response_conditions_responsetime(monkeypatch): + sub_step = { + "timeout": 3.0, + "headers": {"User-Agent": "Nettacker 0.4.0 QUIN"}, + "allow_redirects": False, + "ssl": False, + "url": "http://owasp.org:443", + "method": "get", + "response": { + "condition_type": "and", + "log": "response_dependent['status_code']", + "conditions": {"responsetime": ">= 0.5"}, + }, + } + response = { + "status_code": "200", + "url": "http://owasp.org:443", + "reason": "OK", + "headers": {}, + "responsetime": 0.7, + "content": "body", + } + assert http.response_conditions_matched(sub_step, response) != {} + + response["responsetime"] = 0.2 + assert http.response_conditions_matched(sub_step, response) == {} + + +# ---------------------------- +# Tests for HttpEngine.run +# ---------------------------- +def test_httpengine_run_happy_path_merges_headers_and_random_ua(monkeypatch): + engine = DummyEngine() + + # Patch random.choice to deterministic value + monkeypatch.setattr(http.random, "choice", lambda seq: seq[0]) + + # Patch send_request to bypass network + async def fake_send_request(options, method): + return { + "reason": "OK", + "url": options["url"], + "status_code": "200", + "content": b"OK", + "headers": {"Server": "nginx"}, + "responsetime": 0.1, + } + + monkeypatch.setattr(http, "send_request", fake_send_request) + + sub_step = { + "method": "get", + "timeout": 3.0, + "headers": {"User-Agent": "Nettacker 0.4.0 QUIN"}, + "allow_redirects": False, + "ssl": False, + "url": "http://owasp.org:80", + "response": { + "condition_type": "and", + "log": "response_dependent['status_code']", + "conditions": {"status_code": {"regex": r"200", "reverse": False}}, + }, + } + options = { + "http_header": ["X-Token: 12345", "X-Empty:"], + "user_agent": "random_user_agent", + "user_agents": ["ua1", "ua2"], + "retries": 1, + } + + result = engine.run( + sub_step=sub_step, + module_name="mod", + target="t", + scan_id="id", + options=options, + process_number=0, + module_thread_number=0, + total_module_thread_number=1, + request_number_counter=1, + total_number_of_requests=1, + ) + + assert result["processed"] is True + # Confirm merged headers + assert result["sub_step"]["headers"]["X-Token"] == "12345" + assert result["sub_step"]["headers"]["X-Empty"] == "" + assert result["sub_step"]["headers"]["User-Agent"] == "ua1" + # Confirm content decoded + assert result["response"]["content"] == "OK" + + +def test_httpengine_run_with_iterative_response_match(monkeypatch): + engine = DummyEngine() + + async def fake_send_request(options, method): + return { + "reason": "OK", + "url": options["url"], + "status_code": "200", + "content": b"pattern abc", + "headers": {"Server": "nginx"}, + "responsetime": 0.1, + } + + monkeypatch.setattr(http, "send_request", fake_send_request) + + sub_step = { + "method": "get", + "timeout": 3.0, + "headers": {"User-Agent": "Nettacker 0.4.0 QUIN"}, + "allow_redirects": False, + "ssl": False, + "url": "http://owasp.org:80", + "response": { + "condition_type": "or", # will still evaluate the iterative section + "log": "response_dependent['status_code']", + "conditions": { + "status_code": {"regex": r"200", "reverse": False}, + "iterative_response_match": { + "match1": { + "response": { + "condition_type": "and", + "conditions": {"content": {"regex": r"abc", "reverse": False}}, + } + } + }, + }, + }, + } + options = {"http_header": None, "user_agent": "", "user_agents": [], "retries": 1} + + result = engine.run( + sub_step=sub_step, + module_name="mod", + target="t", + scan_id="id", + options=options, + process_number=0, + module_thread_number=0, + total_module_thread_number=1, + request_number_counter=1, + total_number_of_requests=1, + ) + + # Ensure nested condition was evaluated and present + assert "iterative_response_match" in result["sub_step"]["response"]["conditions"] + assert "conditions_results" in result["sub_step"]["response"] + assert "match1" in result["sub_step"]["response"]["conditions_results"] + + +def test_httpengine_run_with_dependent_on_temp_event(monkeypatch): + class DepEngine(DummyEngine): + def __init__(self): + super().__init__() + + engine = DepEngine() + + async def fake_send_request(options, method): + # url should have been replaced with ABC123 + assert options["url"] == "http://owasp.org/token/ABC123" + return { + "reason": "OK", + "url": options["url"], + "status_code": "200", + "content": b"ok", + "headers": {}, + "responsetime": 0.1, + } + + monkeypatch.setattr(http, "send_request", fake_send_request) + + sub_step = { + "method": "get", + "timeout": 3.0, + "headers": {"User-Agent": "Nettacker 0.4.0 QUIN"}, + "allow_redirects": False, + "ssl": False, + "url": "http://owasp.org/token/{{token}}", + "response": { + "dependent_on_temp_event": {"module": "m", "event": "e"}, + "condition_type": "and", + "log": "response_dependent['status_code']", + "conditions": {"status_code": {"regex": r"200", "reverse": False}}, + }, + } + options = {"http_header": None, "user_agent": "", "user_agents": [], "retries": 1} + + result = engine.run( + sub_step=sub_step, + module_name="mod", + target="t", + scan_id="id", + options=options, + process_number=0, + module_thread_number=0, + total_module_thread_number=1, + request_number_counter=1, + total_number_of_requests=1, + ) + + assert result["processed"] is True + assert "ABC123" in result["response"]["url"]