Added SSL/TLS Modules

This commit is contained in:
Captain-T2004 2024-08-25 19:49:53 +05:30
parent 8c86f6239b
commit e8f57c1d16
10 changed files with 988 additions and 5 deletions

View File

@ -24,5 +24,5 @@ The OWASP Nettacker Events page lists various conferences and meetups where the
* OWASP Nettacker was presented at **OWASP Kyiv** Chapter by **Sam Stepanyan** [[1](https://www.youtube.com/watch?v=KrwQlgeZn7I)]
* OWASP Nettacker was presented at the **AppSec Engineer** session by **Sam Stepanyan** [[1](https://www.youtube.com/watch?v=eXzIPuTtqAQ)]
* OWASP Nettacker was presented at **Security BSides Dublin 2022** conference by **Sam Stepanyan** [[1](https://www.youtube.com/watch?v=GcRFkZEaWqI)]
* OWASP Netacker was presented et the **Appplication Security Podcast** by **Sam Stepanyan** [[1](https://www.youtube.com/watch?v=tqZ8Lmucujw)]
* OWASP Netacker was presented at the **Appplication Security Podcast** by **Sam Stepanyan** [[1](https://www.youtube.com/watch?v=tqZ8Lmucujw)]
* OWASP Nettacker was presented at the **OWASP Global AppSec DC 2023 Conference** by **Sam Stepanyan** [[1](https://www.youtube.com/watch?v=yZxjBme029A)]

View File

@ -147,10 +147,11 @@ If you want to scan all ports please define -g 1-65535 range. Otherwise Nettacke
* '**ProFTPd_integer_overflow_vuln**' - check ProFTPd for CVE-2011-1137
* '**ProFTPd_memory_leak_vuln**' - check ProFTPd for CVE-2001-0136
* '**ProFTPd_restriction_bypass_vuln**' - check ProFTPd for CVE-2009-3639
* '**self_signed_certificate_vuln**' - check for self-signed SSL certificate
* '**server_version_vuln**' - check if the web server is leaking server banner in 'Server' response header
* '**ssl_certificate_expired_vuln**' - check if SSL certificate has expired
* '**weak_signature_algorithm_vuln**'- check if SSL certificate is signed using SHA-1
* '**ssl_signed_certificate_vuln**' - check for self-signed & other signing issues(weak signing algorithm) in SSL certificate
* '**ssl_expired_certificate_vuln**' - check if SSL certificate has expired or is close to expiring
* '**ssl_version_vuln**' - check if the server's SSL configuration supports old and insecure SSL versions
* '**ssl_weak_cipher_vuln**' - check if server's SSL configuration supports weak cipher suites
* '**wordpress_dos_cve_2018_6389_vuln**' - check if Wordpress is vulnerable to CVE-2018-6389 Denial Of Service (DOS)
* '**wp_xmlrpc_bruteforce_vuln**' - check if Wordpress is vulnerable to credential Brute Force via XMLRPC wp.getUsersBlogs
* '**wp_xmlrpc_pingback_vuln**' - check if Wordpress is vulnerable to XMLRPC pingback

254
nettacker/core/lib/ssl.py Normal file
View File

@ -0,0 +1,254 @@
#!/usr/bin/env python
import logging
import socket
import ssl
from datetime import datetime, timezone
from OpenSSL import crypto
from nettacker.core.lib.base import BaseEngine, BaseLibrary
log = logging.getLogger(__name__)
def is_weak_hash_algo(algo):
algo = algo.lower()
for unsafe_algo in ("md2", "md4", "md5", "sha1"):
if unsafe_algo in algo:
return True
return False
def is_weak_ssl_version(host, port, timeout):
def test_ssl_verison(host, port, timeout, ssl_version=None):
try:
context = ssl.SSLContext(ssl_version)
socket_connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_connection.settimeout(timeout)
socket_connection.connect((host, port))
socket_connection = context.wrap_socket(socket_connection, server_hostname=host)
return socket_connection.version()
except ssl.SSLError:
return False
except (socket.timeout, ConnectionRefusedError, ConnectionResetError):
return None
ssl_versions = (
ssl.PROTOCOL_TLS_CLIENT, # TLS 1.3
ssl.PROTOCOL_TLSv1_2,
ssl.PROTOCOL_TLSv1_1,
ssl.PROTOCOL_TLSv1,
)
supported_versions = []
lowest_version = ""
for ssl_version in ssl_versions:
version = test_ssl_verison(host, port, timeout, ssl_version=ssl_version)
if version:
lowest_version = version
supported_versions.append(version)
return (supported_versions, lowest_version not in {"TLSv1.2", "TLSv1.3"})
def is_weak_cipher_suite(host, port, timeout):
def test_single_cipher(host, port, cipher, timeout):
try:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
context.set_ciphers(cipher)
socket_connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_connection.settimeout(timeout)
socket_connection.connect((host, port))
socket_connection = context.wrap_socket(socket_connection, server_hostname=host)
return True
except ssl.SSLError:
return False
except (socket.timeout, ConnectionRefusedError, ConnectionResetError):
return None
cipher_suites = [
"HIGH", # OpenSSL cipher strings
"MEDIUM",
"LOW",
"EXP",
"eNULL",
"aNULL",
"RC4",
"DES",
"MD5",
"SHA1",
"DH",
"ADH",
"DHE",
"ECDH",
"ECDHE",
"TLSv1",
"TLSv1.1",
"TLSv1.2",
"TLSv1.3",
]
supported_ciphers = []
for cipher in cipher_suites:
if test_single_cipher(host, port, cipher, timeout):
supported_ciphers.append(cipher)
weak_ciphers = ["LOW", "EXP", "eNULL", "aNULL", "RC4", "DES", "MD5", "DH", "ADH"]
for cipher in supported_ciphers:
if cipher in weak_ciphers:
return (supported_ciphers, True)
return (supported_ciphers, False)
def create_tcp_socket(host, port, timeout):
try:
socket_connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_connection.settimeout(timeout)
socket_connection.connect((host, port))
ssl_flag = False
except ConnectionRefusedError:
return None
try:
socket_connection = ssl.wrap_socket(socket_connection)
ssl_flag = True
except Exception:
socket_connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_connection.settimeout(timeout)
socket_connection.connect((host, port))
return socket_connection, ssl_flag
class SslLibrary(BaseLibrary):
def ssl_certificate_scan(self, host, port, timeout):
tcp_socket = create_tcp_socket(host, port, timeout)
if tcp_socket is None:
return None
socket_connection, ssl_flag = tcp_socket
peer_name = socket_connection.getpeername()
if ssl_flag:
cert = ssl.get_server_certificate((host, port))
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
weak_signing_algo = is_weak_hash_algo(str(x509.get_signature_algorithm()))
cert_activation = datetime.strptime(
x509.get_notBefore().decode("utf-8"), "%Y%m%d%H%M%S%z"
)
cert_expires = datetime.strptime(x509.get_notAfter().decode("utf-8"), "%Y%m%d%H%M%S%z")
return {
"expired": x509.has_expired(),
"self_signed": x509.get_issuer() == x509.get_subject(),
"signing_algo": str(x509.get_signature_algorithm()),
"weak_signing_algo": weak_signing_algo,
"activation_date": cert_activation.strftime("%d/%m/%Y"),
"not_activated": (cert_activation - datetime.now(timezone.utc)).days > 0,
"expiration_date": cert_expires.strftime("%d/%m/%Y"),
"expiring_soon": (cert_expires - datetime.now(timezone.utc)).days < 30,
"ssl_flag": ssl_flag,
"peer_name": peer_name,
"service": socket.getservbyport(int(port)),
}
return {
"ssl_flag": ssl_flag,
"peer_name": peer_name,
"service": socket.getservbyport(int(port)),
}
def ssl_version_and_cipher_scan(self, host, port, timeout):
tcp_socket = create_tcp_socket(host, port, timeout)
if tcp_socket is None:
return None
socket_connection, ssl_flag = tcp_socket
peer_name = socket_connection.getpeername()
if ssl_flag:
ssl_ver, weak_version = is_weak_ssl_version(host, port, timeout)
cipher_suite, weak_cipher_suite = is_weak_cipher_suite(host, port, timeout)
return {
"ssl_version": ssl_ver,
"weak_version": weak_version,
"cipher_suite": cipher_suite,
"weak_cipher_suite": weak_cipher_suite,
"ssl_flag": ssl_flag,
"peer_name": peer_name,
"service": socket.getservbyport(int(port)),
}
return {
"ssl_flag": ssl_flag,
"service": socket.getservbyport(int(port)),
"peer_name": peer_name,
}
class SslEngine(BaseEngine):
library = SslLibrary
def response_conditions_matched(self, sub_step, response):
conditions = sub_step["response"]["conditions"]
condition_type = sub_step["response"]["condition_type"]
condition_results = {}
if sub_step["method"] in {
"ssl_certificate_scan",
"ssl_version_and_cipher_scan",
}:
if response and response["ssl_flag"]:
for condition in conditions:
if "grouped_conditions" in condition:
gc_type = conditions[condition]["condition_type"]
gc_conditions = conditions[condition]["conditions"]
gc_condition_results = {}
for gc_condition in gc_conditions:
if (
gc_conditions[gc_condition]["reverse"]
and not response[gc_condition]
):
gc_condition_results[gc_condition] = not response[gc_condition]
elif (
not gc_conditions[gc_condition]["reverse"]
and response[gc_condition]
):
gc_condition_results[gc_condition] = response[gc_condition]
if gc_type == "and":
gc_condition_results = (
gc_condition_results
if len(gc_condition_results) == len(gc_conditions)
else {}
)
condition_results.update(gc_condition_results)
elif (conditions[condition]["reverse"] and not response[condition]) or (
not conditions[condition]["reverse"] and response[condition]
):
condition_results[condition] = True
if condition_type == "and":
return condition_results if len(condition_results) == len(conditions) else []
if condition_type == "or":
return condition_results if condition_results else []
return []
return []
def apply_extra_data(self, sub_step, response):
sub_step["response"]["ssl_flag"] = (
response["ssl_flag"] if isinstance(response, dict) else False
)
sub_step["response"]["conditions_results"] = self.response_conditions_matched(
sub_step, response
)

View File

@ -45,7 +45,15 @@ class Module:
self.skip_service_discovery = options.skip_service_discovery
self.discovered_services = None
self.ignored_core_modules = ["subdomain_scan", "icmp_scan", "port_scan"]
self.ignored_core_modules = [
"subdomain_scan",
"icmp_scan",
"port_scan",
"ssl_version_vuln",
"ssl_weak_cipher_vuln",
"ssl_signed_certificate_vuln",
"ssl_expired_certificate_vuln",
]
contents = TemplateLoader("port_scan", {"target": ""}).load()
self.service_discovery_signatures = list(

View File

@ -0,0 +1,52 @@
info:
name: ssl_certificate_vuln
author: Captain-T2004
severity: 6
description: check if there are any ssl_certificate vulnerabilities present
reference:
- https://www.beyondsecurity.com/resources/vulnerabilities/ssl-certificate-expiry
profiles:
- scan
- ssl
payloads:
- library: ssl
steps:
- method: ssl_certificate_scan
timeout: 3
host: "{target}"
ports:
- 21
- 25
- 110
- 143
- 443
- 587
- 990
- 1080
- 8080
response:
condition_type: or
conditions:
grouped_conditions_1:
condition_type: and
conditions:
expired:
reverse: false
expiration_date:
reverse: false
grouped_conditions_2:
condition_type: and
conditions:
expiring_soon:
reverse: false
expiration_date:
reverse: false
grouped_conditions_3:
condition_type: and
conditions:
not_activated:
reverse: false
activation_date:
reverse: false

View File

@ -0,0 +1,39 @@
info:
name: ssl_certificate_vuln
author: Captain-T2004
severity: 6
description: check if there are any ssl_certificate vulnerabilities present
reference:
- https://www.ssl.com/article/ssl-tls-self-signed-certificates/
profiles:
- scan
- ssl
payloads:
- library: ssl
steps:
- method: ssl_certificate_scan
timeout: 3
host: "{target}"
ports:
- 21
- 25
- 110
- 143
- 443
- 587
- 990
- 1080
- 8080
response:
condition_type: or
conditions:
self_signed:
reverse: false
grouped_conditions:
condition_type: and
conditions:
weak_signing_algo:
reverse: false
signing_algo:
reverse: false

View File

@ -0,0 +1,38 @@
info:
name: ssl_version_vuln
author: Captain-T2004
severity: 6
description: check if ssl version is unsafe or uses any bad ciphers.
reference:
- https://www.manageengine.com/privileged-access-management/help/ssl_vulnerability.html
- https://www.cloudflare.com/learning/ssl/why-use-tls-1.3/
profiles:
- scan
- ssl
payloads:
- library: ssl
steps:
- method: ssl_version_and_cipher_scan
timeout: 3
host: "{target}"
ports:
- 21
- 25
- 110
- 143
- 443
- 587
- 990
- 1080
- 8080
response:
condition_type: or
conditions:
grouped_conditions:
condition_type: and
conditions:
weak_version:
reverse: false
ssl_version:
reverse: false

View File

@ -0,0 +1,39 @@
info:
name: ssl_version_vuln
author: Captain-T2004
severity: 6
description: check if ssl version is unsafe or uses any bad ciphers.
reference:
- https://www.manageengine.com/privileged-access-management/help/ssl_vulnerability.html
- https://www.acunetix.com/vulnerabilities/web/tls-ssl-weak-cipher-suites/
profiles:
- scan
- ssl
payloads:
- library: ssl
steps:
- method: ssl_version_and_cipher_scan
timeout: 3
host: "{target}"
ports:
- 21
- 25
- 110
- 143
- 443
- 587
- 990
- 1080
- 8080
response:
condition_type: or
conditions:
grouped_conditions:
condition_type: and
conditions:
weak_cipher_suite:
reverse: false
cipher_suite:
reverse: false

166
tests/core/test_socket.py Normal file
View File

@ -0,0 +1,166 @@
from unittest.mock import patch
from nettacker.core.lib.socket import create_tcp_socket, SocketEngine
from tests.common import TestCase
class Responses:
tcp_connect_only = socket_icmp = {}
tcp_connect_send_and_receive = {
"response": 'HTTP/1.1 400 Bad Request\r\nServer: Apache/2.4.62 (Debian)\r\nContent-Length: 302\r\nConnection: close\r\nContent-Type: text/html; charset=iso-8859-1\r\n\r\n<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">\n<html><head>\n<title>400 Bad Request</title>\n</head><body>\n<h1>Bad Request</h1>\n<p>Your browser sent a request that this server could not understand.<br />\n</p>\n<hr>\n<address>Apache/2.4.62 (Debian)</address>\n</body></html>\n',
"ssl_flag": True,
}
ssl_version_scan = {
"ssl_version": "TLSv1",
"weak_version": True,
"weak_cipher_suite": True,
"ssl_flag": True,
}
none = None
class Substeps:
tcp_connect_send_and_receive = {
"method": "tcp_connect_send_and_receive",
"response": {
"condition_type": "or",
"conditions": {
"open_port": {"regex": "", "reverse": False},
"ftp": {
"regex": "220-You are user number|530 USER and PASS required|Invalid command: try being more creative|220 \\S+ FTP (Service|service|Server|server)|220 FTP Server ready|Directory status|Service closing control connection|Requested file action|Connection closed; transfer aborted|Directory not empty",
"reverse": False,
},
"ftps": {
"regex": "220-You are user number|530 USER and PASS required|Invalid command: try being more creative|220 \\S+ FTP (Service|service|Server|server)|220 FTP Server ready|Directory status|Service closing control connection|Requested file action|Connection closed; transfer aborted|Directory not empty",
"reverse": False,
},
"http": {
"regex": "HTTPStatus.BAD_REQUEST|HTTP\\/[\\d.]+\\s+[\\d]+|Server: |Content-Length: \\d+|Content-Type: |Access-Control-Request-Headers: |Forwarded: |Proxy-Authorization: |User-Agent: |X-Forwarded-Host: |Content-MD5: |Access-Control-Request-Method: |Accept-Language: ",
"reverse": False,
},
"imap": {
"regex": "Internet Mail Server|IMAP4 service|BYE Hi This is the IMAP SSL Redirect|LITERAL\\+ SASL\\-IR LOGIN\\-REFERRALS ID ENABLE IDLE AUTH\\=PLAIN AUTH\\=LOGIN AUTH\\=DIGEST\\-MD5 AUTH\\=CRAM-MD5|CAPABILITY completed|OK IMAPrev1|LITERAL\\+ SASL\\-IR LOGIN\\-REFERRALS ID ENABLE IDLE NAMESPACE AUTH\\=PLAIN AUTH\\=LOGIN|BAD Error in IMAP command received by server|IMAP4rev1 SASL-IR|OK \\[CAPABILITY IMAP4rev1",
"reverse": False,
},
"mariadb": {
"regex": "is not allowed to connect to this MariaDB server",
"reverse": False,
},
"mysql": {
"regex": "is not allowed to connect to this MySQL server",
"reverse": False,
},
"nntp": {
"regex": "NetWare\\-News\\-Server|NetWare nntpd|nntp|Leafnode nntpd|InterNetNews NNRP server INN",
"reverse": False,
},
"pop3": {
"regex": "POP3|POP3 gateway ready|POP3 Server|Welcome to mpopd|OK Hello there",
"reverse": False,
},
"pop3s": {
"regex": "POP3|POP3 gateway ready|POP3 Server|Welcome to mpopd|OK Hello there",
"reverse": False,
},
"portmap": {
"regex": "Program\tVersion\tProtocol\tPort|portmapper|nfs\t2|nlockmgr\t1",
"reverse": False,
},
"postgressql": {
"regex": "FATAL 1\\: invalid length of startup packet|received invalid response to SSL negotiation\\:|unsupported frontend protocol|fe\\_sendauth\\: no password supplied|no pg\\_hba\\.conf entry for host",
"reverse": False,
},
"pptp": {"regex": "Hostname: pptp server|Vendor: Fortinet pptp", "reverse": False},
"smtp": {
"regex": "Fidelix Fx2020|ESMTP|Server ready|SMTP synchronization error|220-Greetings|ESMTP Arnet Email Security|SMTP 2.0",
"reverse": False,
},
"smtps": {
"regex": "Fidelix Fx2020|ESMTP|Server ready|SMTP synchronization error|220-Greetings|ESMTP Arnet Email Security|SMTP 2.0",
"reverse": False,
},
"rsync": {"regex": "@RSYNCD\\:", "reverse": False},
"ssh": {
"regex": "openssh|\\-OpenSSH\\_|\\r\\nProtocol mism|\\_sshlib|\\x00\\x1aversion info line too long|SSH Windows NT Server|WinNT sshd|sshd| SSH Secure Shell|WinSSHD",
"reverse": False,
},
"telnet": {
"regex": "Check Point FireWall-1 authenticated Telnet server running on|Raptor Firewall Secure Gateway|No more connections are allowed to telnet server|Closing Telnet connection due to host problems|NetportExpress|WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING|Login authentication|recommended to use Stelnet|is not a secure protocol|Welcome to Microsoft Telnet Servic|no decompiling or reverse-engineering shall be allowed",
"reverse": False,
},
},
},
}
tcp_connect_only = {
"method": "tcp_connect_only",
"response": {
"condition_type": "or",
"conditions": {"time_response": {"regex": "", "reverse": False}},
},
}
socket_icmp = {
"method": "socket_icmp",
"response": {
"condition_type": "or",
"conditions": {"time_response": {"regex": "", "reverse": False}},
},
}
class TestSocketMethod(TestCase):
@patch("socket.socket")
@patch("ssl.wrap_socket")
def test_create_tcp_socket(self, mock_wrap, mock_socket):
HOST = "example.com"
PORT = 80
TIMEOUT = 60
create_tcp_socket(HOST, PORT, TIMEOUT)
socket_instance = mock_socket.return_value
socket_instance.settimeout.assert_called_with(TIMEOUT)
socket_instance.connect.assert_called_with((HOST, PORT))
mock_wrap.assert_called_with(socket_instance)
def test_response_conditions_matched(self):
# tests the response conditions matched for different scan methods
engine = SocketEngine()
Substep = Substeps()
Response = Responses()
# socket_icmp
self.assertEqual(
engine.response_conditions_matched(Substep.socket_icmp, Response.socket_icmp),
Response.socket_icmp,
)
# tcp_connect_send_and_receive, Port scan's substeps are taken for the test
self.assertEqual(
sorted(
engine.response_conditions_matched(
Substep.tcp_connect_send_and_receive, Response.tcp_connect_send_and_receive
)
),
sorted(
{"http": ["Content-Type: ", "Content-Length: 302", "HTTP/1.1 400", "Server: "]}
),
)
# tcp_connect_only
self.assertEqual(
engine.response_conditions_matched(
Substep.tcp_connect_only, Response.tcp_connect_only
),
Response.tcp_connect_only,
)
# * scans with response None i.e. TCP connection failed(None)
self.assertEqual(
engine.response_conditions_matched(
Substep.tcp_connect_send_and_receive, Response.none
),
[],
)

386
tests/core/test_ssl.py Normal file
View File

@ -0,0 +1,386 @@
import ssl
from unittest.mock import patch
from nettacker.core.lib.ssl import (
SslEngine,
SslLibrary,
create_tcp_socket,
is_weak_hash_algo,
is_weak_ssl_version,
is_weak_cipher_suite,
)
from tests.common import TestCase
class MockConnectionObject:
def __init__(self, peername, version=None):
self.Peername = peername
self.Version = version
def getpeername(self):
return self.Peername
def version(self):
return self.Version
class Mockx509Object:
def __init__(self, issuer, subject, is_expired, expire_date, activation_date, signing_algo):
self.issuer = issuer
self.subject = subject
self.expired = is_expired
self.expire_date = expire_date
self.activation_date = activation_date
self.signature_algorithm = signing_algo
def get_issuer(self):
return self.issuer
def get_subject(self):
return self.subject
def has_expired(self):
return self.expired
def get_notAfter(self):
return self.expire_date
def get_notBefore(self):
return self.activation_date
def get_signature_algorithm(self):
return self.signature_algorithm
class Responses:
ssl_version_vuln = {
"ssl_version": ["TLSv1"],
"weak_version": True,
"ssl_flag": True,
}
ssl_certificate_expired = {
"expired": True,
"expiration_date": "07/12/2023",
"not_activated": False,
"activation_date": "07/12/2023",
"expiring_soon": True,
"ssl_flag": True,
}
ssl_certificate_deactivated = {
"expired": False,
"expiration_date": "07/12/2100",
"expiring_soon": False,
"not_activated": True,
"activation_date": "07/12/2100",
"ssl_flag": True,
}
ssl_off = {"ssl_flag": False}
class Substeps:
ssl_version_vuln = {
"method": "ssl_version_and_cipher_scan",
"response": {
"condition_type": "or",
"conditions": {
"grouped_conditions": {
"condition_type": "and",
"conditions": {
"weak_version": {"reverse": False},
"ssl_version": {"reverse": False},
},
}
},
},
}
ssl_expired_certificate_scan = {
"method": "ssl_certificate_scan",
"response": {
"condition_type": "or",
"conditions": {
"grouped_conditions_1": {
"condition_type": "and",
"conditions": {
"expired": {"reverse": False},
"expiration_date": {"reverse": False},
},
},
"grouped_conditions_2": {
"condition_type": "and",
"conditions": {
"expiring_soon": {"reverse": False},
"expiration_date": {"reverse": False},
},
},
"grouped_conditions_3": {
"condition_type": "and",
"conditions": {
"not_activated": {"reverse": False},
"activation_date": {"reverse": False},
},
},
},
},
}
class TestSocketMethod(TestCase):
@patch("socket.socket")
@patch("ssl.wrap_socket")
def test_create_tcp_socket(self, mock_wrap, mock_socket):
HOST = "example.com"
PORT = 80
TIMEOUT = 60
create_tcp_socket(HOST, PORT, TIMEOUT)
socket_instance = mock_socket.return_value
socket_instance.settimeout.assert_called_with(TIMEOUT)
socket_instance.connect.assert_called_with((HOST, PORT))
mock_wrap.assert_called_with(socket_instance)
@patch("nettacker.core.lib.ssl.is_weak_cipher_suite")
@patch("nettacker.core.lib.ssl.is_weak_ssl_version")
@patch("nettacker.core.lib.ssl.create_tcp_socket")
def test_ssl_version_and_cipher_scan(self, mock_connection, mock_ssl_check, mock_cipher_check):
library = SslLibrary()
HOST = "example.com"
PORT = 80
TIMEOUT = 60
mock_connection.return_value = (MockConnectionObject(HOST, "TLSv1.3"), True)
mock_ssl_check.return_value = ("TLSv1.3", False)
mock_cipher_check.return_value = (["HIGH"], False)
self.assertEqual(
library.ssl_version_and_cipher_scan(HOST, PORT, TIMEOUT),
{
"ssl_flag": True,
"service": "http",
"weak_version": False,
"ssl_version": "TLSv1.3",
"peer_name": "example.com",
"cipher_suite": ["HIGH"],
"weak_cipher_suite": False,
},
)
mock_connection.return_value = (MockConnectionObject(HOST, "TLSv1.1"), True)
mock_ssl_check.return_value = ("TLSv1.1", True)
mock_cipher_check.return_value = (["LOW"], True)
self.assertEqual(
library.ssl_version_and_cipher_scan(HOST, PORT, TIMEOUT),
{
"ssl_flag": True,
"service": "http",
"weak_version": True,
"ssl_version": "TLSv1.1",
"peer_name": "example.com",
"cipher_suite": ["LOW"],
"weak_cipher_suite": True,
},
)
mock_connection.return_value = (MockConnectionObject(HOST), False)
self.assertEqual(
library.ssl_version_and_cipher_scan(HOST, PORT, TIMEOUT),
{
"ssl_flag": False,
"service": "http",
"peer_name": "example.com",
},
)
@patch("nettacker.core.lib.ssl.create_tcp_socket")
@patch("nettacker.core.lib.ssl.is_weak_hash_algo")
@patch("nettacker.core.lib.ssl.crypto.load_certificate")
@patch("nettacker.core.lib.ssl.ssl.get_server_certificate")
def test_ssl_certificate_scan(
self, mock_certificate, mock_x509, mock_hash_check, mock_connection
):
library = SslLibrary()
HOST = "example.com"
PORT = 80
TIMEOUT = 60
mock_hash_check.return_value = False
mock_connection.return_value = (MockConnectionObject(HOST, "TLSv1.3"), True)
mock_x509.return_value = Mockx509Object(
is_expired=False,
issuer="test_issuer",
subject="test_subject",
signing_algo="test_algo",
expire_date=b"21001207153045Z",
activation_date=b"20231207153045Z",
)
self.assertEqual(
library.ssl_certificate_scan(HOST, PORT, TIMEOUT),
{
"expired": False,
"ssl_flag": True,
"service": "http",
"self_signed": False,
"expiring_soon": False,
"expiration_date": "07/12/2100",
"not_activated": False,
"activation_date": "07/12/2023",
"signing_algo": "test_algo",
"weak_signing_algo": False,
"peer_name": "example.com",
},
)
mock_hash_check.return_value = True
mock_connection.return_value = (MockConnectionObject(HOST, "TLSv1.3"), True)
mock_x509.return_value = Mockx509Object(
is_expired=True,
issuer="test_issuer_subject",
subject="test_issuer_subject",
signing_algo="test_algo",
expire_date=b"21001207153045Z",
activation_date=b"21001207153045Z",
)
self.assertEqual(
library.ssl_certificate_scan(HOST, PORT, TIMEOUT),
{
"expired": True,
"ssl_flag": True,
"service": "http",
"self_signed": True,
"expiring_soon": False,
"expiration_date": "07/12/2100",
"not_activated": True,
"activation_date": "07/12/2100",
"signing_algo": "test_algo",
"weak_signing_algo": True,
"peer_name": "example.com",
},
)
mock_connection.return_value = (MockConnectionObject(HOST), False)
self.assertEqual(
library.ssl_certificate_scan(HOST, PORT, TIMEOUT),
{
"service": "http",
"ssl_flag": False,
"peer_name": "example.com",
},
)
mock_certificate.assert_called_with((HOST, PORT))
@patch("socket.socket")
@patch("ssl.create_default_context")
def test_is_weak_cipher_suite(self, mock_context, mock_socket):
HOST = "example.com"
PORT = 80
TIMEOUT = 60
socket_instance = mock_socket.return_value
context_instance = mock_context.return_value
cipher_list = [
"HIGH",
"MEDIUM",
"LOW",
"EXP",
"eNULL",
"aNULL",
"RC4",
"DES",
"MD5",
"SHA1",
"DH",
"ADH",
"DHE",
"ECDH",
"ECDHE",
"TLSv1",
"TLSv1.1",
"TLSv1.2",
"TLSv1.3",
]
self.assertEqual(is_weak_cipher_suite(HOST, PORT, TIMEOUT), (cipher_list, True))
context_instance.wrap_socket.assert_called_with(socket_instance, server_hostname=HOST)
socket_instance.settimeout.assert_called_with(TIMEOUT)
socket_instance.connect.assert_called_with((HOST, PORT))
context_instance.wrap_socket.side_effect = ssl.SSLError
self.assertEqual(is_weak_cipher_suite(HOST, PORT, TIMEOUT), ([], False))
def test_is_weak_hash_algo(self):
for algo in ("md2", "md4", "md5", "sha1"):
self.assertTrue(is_weak_hash_algo(algo))
self.assertFalse(is_weak_hash_algo("test_aglo"))
@patch("socket.socket")
@patch("ssl.SSLContext")
def test_is_weak_ssl_version(self, mock_context, mock_socket):
HOST = "example.com"
PORT = 80
TIMEOUT = 60
socket_instance = mock_socket.return_value
context_instance = mock_context.return_value
context_instance.wrap_socket.return_value = MockConnectionObject(HOST, "TLSv1.3")
self.assertEqual(
is_weak_ssl_version(HOST, PORT, TIMEOUT),
(["TLSv1.3", "TLSv1.3", "TLSv1.3", "TLSv1.3"], False),
)
context_instance.wrap_socket.return_value = MockConnectionObject(HOST, "TLSv1.1")
self.assertEqual(
is_weak_ssl_version(HOST, PORT, TIMEOUT),
(["TLSv1.1", "TLSv1.1", "TLSv1.1", "TLSv1.1"], True),
)
context_instance.wrap_socket.side_effect = ssl.SSLError
self.assertEqual(is_weak_ssl_version(HOST, PORT, TIMEOUT), ([], True))
context_instance.wrap_socket.side_effect = ConnectionRefusedError
self.assertEqual(is_weak_ssl_version(HOST, PORT, TIMEOUT), ([], True))
socket_instance.settimeout.assert_called_with(TIMEOUT)
socket_instance.connect.assert_called_with((HOST, PORT))
context_instance.wrap_socket.assert_called_with(socket_instance, server_hostname=HOST)
def test_response_conditions_matched(self):
# tests the response conditions matched for different scan methods
engine = SslEngine()
Substep = Substeps()
Response = Responses()
# ssl_certificate_scan_expired
self.assertEqual(
engine.response_conditions_matched(
Substep.ssl_expired_certificate_scan, Response.ssl_certificate_expired
),
{"expired": True, "expiration_date": "07/12/2023", "expiring_soon": True},
)
# ssl_certificate_scan_not_activated
self.assertEqual(
engine.response_conditions_matched(
Substep.ssl_expired_certificate_scan,
Response.ssl_certificate_deactivated,
),
{"not_activated": True, "activation_date": "07/12/2100"},
)
# ssl_version_vuln
self.assertEqual(
engine.response_conditions_matched(
Substep.ssl_version_vuln, Response.ssl_version_vuln
),
{"weak_version": True, "ssl_version": ["TLSv1"]},
)
# ssl_* scans with ssl_flag = False
self.assertEqual(
engine.response_conditions_matched(Substep.ssl_version_vuln, Response.ssl_off), []
)
# * scans with response None i.e. TCP connection failed(None)
self.assertEqual(engine.response_conditions_matched(Substep.ssl_version_vuln, None), [])