Validating IPs using the pre-defined functions in ip.py to improve performance

This commit is contained in:
Aarush289 2025-10-30 19:58:13 +05:30
parent ec9726692f
commit 2832342e42
No known key found for this signature in database
3 changed files with 31 additions and 70 deletions

View File

@ -4,15 +4,17 @@ import os
import shutil
import socket
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Thread
import multiprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from nettacker import logger
from nettacker.config import Config, version_info
from nettacker.core.arg_parser import ArgParser
from nettacker.core.die import die_failure
from nettacker.core.graph import create_report, create_compare_report
from nettacker.core.hostcheck import resolve_quick
from nettacker.core.ip import (
get_ip_range,
generate_ip_range,
@ -23,7 +25,6 @@ from nettacker.core.ip import (
is_ipv6_range,
is_ipv6_cidr,
)
from nettacker.core.hostcheck import resolve_quick, is_ip_literal, valid_hostname
from nettacker.core.messages import messages as _
from nettacker.core.module import Module
from nettacker.core.socks_proxy import set_socks_proxy
@ -289,7 +290,6 @@ class Nettacker(ArgParser):
return os.EX_OK
def filter_valid_targets(self, targets, timeout_per_target=2.0, max_threads=None, dedupe=True):
"""
Parallel validation of targets via resolve_quick(target, timeout_sec).
@ -341,8 +341,7 @@ class Nettacker(ArgParser):
return filtered
def scan_target_group(self, targets, scan_id, process_number):
if(not self.arguments.socks_proxy and self.arguments.validate_before_scan):
if not self.arguments.socks_proxy and self.arguments.validate_before_scan:
targets = self.filter_valid_targets(
targets,
timeout_per_target=2.0,

View File

@ -362,7 +362,6 @@ class ArgParser(ArgumentParser):
default=Config.settings.validate_before_scan,
dest="validate_before_scan",
help=_("validate_before_scan"),
)
method_options.add_argument(
"-t",

View File

@ -1,52 +1,19 @@
# nettacker/core/hostcheck.py
from __future__ import annotations
import concurrent.futures
import re
import socket
import time
import concurrent.futures
from nettacker import logger
from nettacker.core.ip import (
get_ip_range,
generate_ip_range,
is_single_ipv4,
is_ipv4_range,
is_ipv4_cidr,
is_single_ipv6,
is_ipv6_range,
is_ipv6_cidr,
)
from nettacker.core.ip import is_single_ipv4, is_single_ipv6
log = logger.get_logger()
_LABEL = re.compile(r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$")
_IPV4_VALID_RE = re.compile(
r'^(?:'
r'(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\.){3}'
r'(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)'
r'$'
)
def is_valid_ipv4(s: str) -> bool:
return bool(_IPV4_VALID_RE.match(s))
def is_ip_literal(name: str) -> bool:
"""Return True if name is a valid IPv4 or IPv6 address literal."""
if is_single_ipv4(name):
if is_valid_ipv4(name):
return True
else:
return False
else:
try:
socket.inet_pton(socket.AF_INET6, name)
return True
except OSError:
return False
def valid_hostname(
host: str,
allow_single_label: bool = True
) -> bool:
def valid_hostname(host: str, allow_single_label: bool = True) -> bool:
"""
Validate hostname syntax per RFC 1123.
Args:
@ -56,7 +23,9 @@ def valid_hostname(
Returns:
True if the hostname is syntactically valid.
"""
if host.endswith("."): # From RFC 1123 ,the number of characters can be 250 at max (without dots) and 253 with dots
if host.endswith(
"."
): # From RFC 1123 ,the number of characters can be 250 at max (without dots) and 253 with dots
host = host[:-1]
if len(host) > 253:
return False
@ -68,9 +37,8 @@ def valid_hostname(
def _gai_once(name: str, use_ai_addrconfig: bool, port):
flags = getattr(socket, "AI_ADDRCONFIG", 0) if use_ai_addrconfig else 0
return socket.getaddrinfo(
name, port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, flags
)
return socket.getaddrinfo(name, port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, flags)
def _clean_host(s: str) -> str:
# remove surrounding quotes and whitespaces
@ -82,10 +50,9 @@ def _clean_host(s: str) -> str:
# collapse accidental spaces inside
return s
def resolve_quick(
host: str,
timeout_sec: float = 2.0,
allow_single_label: bool = True
host: str, timeout_sec: float = 2.0, allow_single_label: bool = True
) -> tuple[bool, str | None]:
"""
Perform fast DNS resolution with timeout.
@ -98,10 +65,8 @@ def resolve_quick(
(True, host_name) on success, (False, None) on failure/timeout.
"""
host = _clean_host(host)
if is_single_ipv4(host) or is_single_ipv6(host):
if is_ip_literal(host):
if is_single_ipv4(host) or is_single_ipv6(host): # IP literal, no resolution needed
return True, host
return False, None
if host.endswith("."):
host = host[:-1]
@ -125,5 +90,3 @@ def resolve_quick(
# DNS resolution failed for this candidate, try next
continue
return False, None