Nettacker/nettacker/database/db.py

1151 lines
37 KiB
Python

import json
import time
try:
import apsw
except ImportError:
apsw = None
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from nettacker import logger
from nettacker.api.helpers import structure
from nettacker.config import Config
from nettacker.core.messages import messages
from nettacker.database.models import HostsLog, Report, TempEvents
config = Config()
logger = logger.get_logger()
def db_inputs(connection_type):
"""
a function to determine the type of database the user wants to work with and
selects the corresponding connection to the db
Args:
connection_type: type of db we are working with
Returns:
corresponding command to connect to the db
"""
context = Config.db.as_dict()
return {
"postgres": "postgresql+psycopg2://{username}:{password}@{host}:{port}/{name}?sslmode={ssl_mode}".format(
**context
),
"mysql": "mysql+pymysql://{username}:{password}@{host}:{port}/{name}".format(**context),
"sqlite": "sqlite:///{name}".format(**context),
}[connection_type]
def create_connection():
"""
a function to create connections to db with pessimistic approach
For sqlite, it creates and returns a sqlite connection object
for mysql and postgresql, it returns the connection or False if
connection failed.
"""
if Config.db.engine.startswith("sqlite") and Config.settings.use_apsw_for_sqlite:
if apsw is None:
raise ImportError("APSW is required for SQLite backend.")
# In case of sqlite, the name parameter is the database path
try:
DB_PATH = config.db.as_dict()["name"]
connection = apsw.Connection(DB_PATH)
connection.setbusytimeout(int(config.settings.timeout) * 100)
cursor = connection.cursor()
# Performance enhancing configurations. Put WAL cause that helps with concurrency
cursor.execute(f"PRAGMA journal_mode={Config.db.journal_mode}")
cursor.execute(f"PRAGMA synchronous={Config.db.synchronous_mode}")
return connection, cursor
except Exception as e:
logger.error(f"Failed to create APSW connection: {e}")
raise
else:
connection_args = {}
if Config.db.engine.startswith("sqlite"):
connection_args["check_same_thread"] = False
db_engine = create_engine(
db_inputs(Config.db.engine),
connect_args=connection_args,
pool_size=50,
pool_pre_ping=True,
)
Session = sessionmaker(bind=db_engine)
return Session()
def send_submit_query(session):
"""
a function to send submit based queries to db
(such as insert and update or delete), it retries 100 times if
connection returned an error.
Args:
session: session to commit
Returns:
True if submitted success otherwise False
"""
if isinstance(session, tuple):
connection, cursor = session
for _ in range(100):
try:
connection.execute("COMMIT")
return True
except Exception:
connection.execute("ROLLBACK")
time.sleep(0.1)
finally:
connection.close()
connection.close()
logger.warn(messages("database_connect_fail"))
return False
else:
try:
for _ in range(1, 100):
try:
session.commit()
return True
except Exception:
time.sleep(0.1)
logger.warn(messages("database_connect_fail"))
return False
except Exception:
logger.warn(messages("database_connect_fail"))
return False
return False
def submit_report_to_db(event):
"""
this function created to submit the generated reports into db, the
files are not stored in db, just the path!
Args:
event: event log
Returns:
return True if submitted otherwise False
"""
logger.verbose_info(messages("inserting_report_db"))
session = create_connection()
if isinstance(session, tuple):
connection, cursor = session
try:
cursor.execute("BEGIN")
cursor.execute(
"""
INSERT INTO reports (date, scan_unique_id, report_path_filename, options)
VALUES (?, ?, ?, ?)
""",
(
str(event["date"]),
event["scan_id"],
event["options"]["report_path_filename"],
json.dumps(event["options"]),
),
)
return send_submit_query(session)
except Exception:
cursor.execute("ROLLBACK")
logger.warn("Could not insert report...")
return False
finally:
cursor.close()
connection.close()
else:
session.add(
Report(
date=event["date"],
scan_unique_id=event["scan_id"],
report_path_filename=event["options"]["report_path_filename"],
options=json.dumps(event["options"]),
)
)
return send_submit_query(session)
def remove_old_logs(options):
"""
this function remove old events (and duplicated)
from nettacker.database based on target, module, scan_id
Args:
options: identifiers
Returns:
True if success otherwise False
"""
session = create_connection()
if isinstance(session, tuple):
connection, cursor = session
try:
cursor.execute("BEGIN")
cursor.execute(
"""
DELETE FROM scan_events
WHERE target = ?
AND module_name = ?
AND scan_unique_id != ?
AND scan_unique_id != ?
""",
(
options["target"],
options["module_name"],
options["scan_id"],
options["scan_compare_id"],
),
)
return send_submit_query(session)
except Exception:
cursor.execute("ROLLBACK")
logger.warn("Could not remove old logs...")
return False
finally:
cursor.close()
connection.close()
else:
session.query(HostsLog).filter(
HostsLog.target == options["target"],
HostsLog.module_name == options["module_name"],
HostsLog.scan_unique_id != options["scan_id"],
HostsLog.scan_unique_id != options["scan_compare_id"],
# Don't remove old logs if they are to be used for the scan reports
).delete(synchronize_session=False)
return send_submit_query(session)
def submit_logs_to_db(log):
"""
this function created to submit new events into database.
This requires a little more robust handling in case of
APSW in order to avoid database lock issues.
Args:
log: log event in JSON type
Returns:
True if success otherwise False
"""
if isinstance(log, dict):
session = create_connection()
if isinstance(session, tuple):
connection, cursor = session
try:
for _ in range(Config.settings.max_retries):
try:
if not connection.in_transaction:
connection.execute("BEGIN")
cursor.execute(
"""
INSERT INTO scan_events (target, date, module_name, scan_unique_id, port, event, json_event)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
log["target"],
str(log["date"]),
log["module_name"],
log["scan_id"],
json.dumps(log["port"]),
json.dumps(log["event"]),
json.dumps(log["json_event"]),
),
)
return send_submit_query(session)
except apsw.BusyError as e:
if "database is locked" in str(e).lower():
logger.warn(
f"[Retry {_ + 1}/{Config.settings.max_retries}] Database is locked. Retrying..."
)
if connection.in_transaction:
connection.execute("ROLLBACK")
time.sleep(Config.settings.retry_delay)
continue
else:
if connection.in_transaction:
connection.execute("ROLLBACK")
return False
except Exception:
try:
if connection.in_transaction:
connection.execute("ROLLBACK")
except Exception:
pass
return False
# All retires exhausted but we want to continue operation
logger.warn("All retries exhausted. Skipping this log.")
return True
finally:
cursor.close()
connection.close()
else:
session.add(
HostsLog(
target=log["target"],
date=log["date"],
module_name=log["module_name"],
scan_unique_id=log["scan_id"],
port=json.dumps(log["port"]),
event=json.dumps(log["event"]),
json_event=json.dumps(log["json_event"]),
)
)
return send_submit_query(session)
else:
logger.warn(messages("invalid_json_type_to_db").format(log))
return False
def submit_temp_logs_to_db(log):
"""
this function created to submit new events into database.
This requires a little more robust handling in case of
APSW in order to avoid database lock issues.
Args:
log: log event in JSON type
Returns:
True if success otherwise False
"""
if isinstance(log, dict):
session = create_connection()
if isinstance(session, tuple):
connection, cursor = session
try:
for _ in range(Config.settings.max_retries):
try:
if not connection.in_transaction:
cursor.execute("BEGIN")
cursor.execute(
"""
INSERT INTO temp_events (target, date, module_name, scan_unique_id, event_name, port, event, data)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
log["target"],
str(log["date"]),
log["module_name"],
log["scan_id"],
log["event_name"],
json.dumps(log["port"]),
json.dumps(log["event"]),
json.dumps(log["data"]),
),
)
return send_submit_query(session)
except apsw.BusyError as e:
if "database is locked" in str(e).lower():
logger.warn(
f"[Retry {_ + 1}/{Config.settings.max_retries}] Database is locked. Retrying..."
)
try:
if connection.in_transaction:
connection.execute("ROLLBACK")
except Exception:
pass
time.sleep(Config.settings.retry_delay)
continue
else:
try:
if connection.in_transaction:
connection.execute("ROLLBACK")
except Exception:
pass
return False
except Exception:
try:
if connection.in_transaction:
connection.execute("ROLLBACK")
except Exception:
pass
return False
# All retires exhausted but we want to continue operation
logger.warn("All retries exhausted. Skipping this log.")
return True
finally:
cursor.close()
connection.close()
else:
session.add(
TempEvents(
target=log["target"],
date=log["date"],
module_name=log["module_name"],
scan_unique_id=log["scan_id"],
event_name=log["event_name"],
port=json.dumps(log["port"]),
event=json.dumps(log["event"]),
data=json.dumps(log["data"]),
)
)
return send_submit_query(session)
else:
logger.warn(messages("invalid_json_type_to_db").format(log))
return False
def find_temp_events(target, module_name, scan_id, event_name):
"""
select all events by scan_unique id, target, module_name
Args:
target: target
module_name: module name
scan_id: unique scan identifier
event_name: event_name
Returns:
an array with JSON events or an empty array
"""
session = create_connection()
if isinstance(session, tuple):
connection, cursor = session
try:
cursor.execute(
"""
SELECT event
FROM temp_events
WHERE target = ? AND module_name = ? AND scan_unique_id = ? AND event_name = ?
LIMIT 1
""",
(target, module_name, scan_id, event_name),
)
row = cursor.fetchone()
cursor.close()
connection.close()
if row:
return row[0]
return []
except Exception:
logger.warn(messages("database_connect_fail"))
return []
return []
else:
result = (
session.query(TempEvents)
.filter(
TempEvents.target == target,
TempEvents.module_name == module_name,
TempEvents.scan_unique_id == scan_id,
TempEvents.event_name == event_name,
)
.first()
)
return result.event if result else []
def find_events(target, module_name, scan_id):
"""
select all events by scan_unique id, target, module_name
Args:
target: target
module_name: module name
scan_id: unique scan identifier
Returns:
an array with JSON events or an empty array
"""
session = create_connection()
if isinstance(session, tuple):
connection, cursor = session
try:
cursor.execute(
"""
SELECT json_event FROM scan_events
WHERE target = ? AND module_name = ? and scan_unique_id = ?
""",
(target, module_name, scan_id),
)
rows = cursor.fetchall()
cursor.close()
connection.close()
if rows:
return [json.dumps((json.loads(row[0]))) for row in rows]
return []
except Exception:
logger.warn("Database query failed...")
return []
else:
return [
row.json_event
for row in session.query(HostsLog)
.filter(
HostsLog.target == target,
HostsLog.module_name == module_name,
HostsLog.scan_unique_id == scan_id,
)
.all()
]
def select_reports(page):
"""
this function created to crawl into submitted results,
it shows last 10 results submitted in the database.
you may change the page (default 1) to go to next/previous page.
Args:
page: page number
Returns:
list of events in array and JSON type, otherwise an error in JSON type.
"""
selected = []
session = create_connection()
if isinstance(session, tuple):
connection, cursor = session
offset = (page - 1) * 10
try:
cursor.execute(
"""
SELECT id, date, scan_unique_id, report_path_filename, options
FROM reports
ORDER BY id DESC
LIMIT 10 OFFSET ?
""",
(offset,),
)
rows = cursor.fetchall()
cursor.close()
connection.close()
for row in rows:
tmp = {
"id": row[0],
"date": str(row[1]),
"scan_id": row[2],
"report_path_filename": row[3],
"options": json.loads(row[4]),
}
selected.append(tmp)
return selected
except Exception:
logger.warn("Could not retrieve report...")
return structure(status="error", msg="database error!")
else:
try:
search_data = (
session.query(Report).order_by(Report.id.desc()).offset((page * 10) - 10).limit(10)
)
for data in search_data:
tmp = {
"id": data.id,
"date": data.date,
"scan_id": data.scan_unique_id,
"report_path_filename": data.report_path_filename,
"options": json.loads(data.options),
}
selected.append(tmp)
except Exception:
return structure(status="error", msg="database error!")
return selected
def get_scan_result(id):
"""
this function created to download results by the result ID.
Args:
id: scan id
Returns:
result file content (TEXT, HTML, JSON) if success otherwise and error in JSON type.
"""
session = create_connection()
if isinstance(session, tuple):
connection, cursor = session
cursor.execute(
"""
SELECT report_path_filename from reports
WHERE id = ?
""",
(id,),
)
row = cursor.fetchone()
cursor.close()
connection.close()
if row:
filename = row[0]
try:
return filename, open(str(filename), "rb").read()
except IOError as e:
logger.error(f"Failed to read report file: {e}")
return None
else:
return structure(status="error", msg="database error!")
else:
report = session.query(Report).filter_by(id=id).first()
if not report:
return None
try:
return report.report_path_filename, open(str(report.report_path_filename), "rb").read()
except IOError as e:
logger.error(f"Failed to read report file: {e}")
return None
def last_host_logs(page):
"""
this function created to select the last 10 events from the database.
you can goto next page by changing page value.
Args:
page: page number
Returns:
an array of events in JSON type if success otherwise an error in JSON type
"""
session = create_connection()
if isinstance(session, tuple):
connection, cursor = session
try:
cursor.execute(
"""
SELECT DISTINCT target
FROM scan_events
ORDER BY id DESC
LIMIT 10 OFFSET ?
""",
[(page - 1) * 10],
)
targets = cursor.fetchall()
if not targets:
return structure(status="finished", msg="No more search results")
hosts = []
for (target,) in targets:
cursor.execute(
"""
SELECT DISTINCT module_name
FROM scan_events
WHERE target = ?
""",
[target],
)
module_names = [row[0] for row in cursor.fetchall()]
cursor.execute(
"""
SELECT date
FROM scan_events
WHERE target = ?
ORDER BY id DESC
LIMIT 1
""",
[target],
)
latest_date = cursor.fetchone()
latest_date = latest_date[0] if latest_date else None
cursor.execute(
"""
SELECT event
FROM scan_events
WHERE target = ?
""",
[target],
)
events = [row[0] for row in cursor.fetchall()]
hosts.append(
{
"target": target,
"info": {
"module_name": module_names,
"date": latest_date,
"events": events,
},
}
)
cursor.close()
connection.close()
return hosts
except Exception:
logger.warn("Database query failed...")
return structure(status="error", msg="Database error!")
else:
hosts = [
{
"target": host.target,
"info": {
"module_name": [
_.module_name
for _ in session.query(HostsLog)
.filter(HostsLog.target == host.target)
.group_by(HostsLog.module_name)
.all()
],
"date": session.query(HostsLog)
.filter(HostsLog.target == host.target)
.order_by(HostsLog.id.desc())
.first()
.date,
# "options": [ # unnecessary data?
# _.options for _ in session.query(HostsLog).filter(
# HostsLog.target == host.target
# ).all()
# ],
"events": [
_.event
for _ in session.query(HostsLog)
.filter(HostsLog.target == host.target)
.all()
],
},
}
for host in session.query(HostsLog)
.group_by(HostsLog.target)
.order_by(HostsLog.id.desc())
.offset((page * 10) - 10)
.limit(10)
]
if len(hosts) == 0:
return structure(status="finished", msg="No more search results")
return hosts
def get_logs_by_scan_id(scan_id):
"""
select all events by scan id hash
Args:
scan_id: scan id hash
Returns:
an array with JSON events or an empty array
"""
session = create_connection()
if isinstance(session, tuple):
connection, cursor = session
cursor.execute(
"""
SELECT scan_unique_id, target, module_name, date, port, event, json_event
from scan_events
WHERE scan_unique_id = ?
""",
(scan_id,), # We have to put this as an indexed element
)
rows = cursor.fetchall()
cursor.close()
connection.close()
return [
{
"scan_id": row[0],
"target": row[1],
"module_name": row[2],
"date": str(row[3]),
"port": json.loads(row[4]),
"event": json.loads(row[5]),
"json_event": json.loads(row[6]) if row[6] else {},
}
for row in rows
]
else:
return [
{
"scan_id": scan_id,
"target": log.target,
"module_name": log.module_name,
"date": str(log.date),
"port": json.loads(log.port),
"event": json.loads(log.event),
"json_event": log.json_event,
}
for log in session.query(HostsLog).filter(HostsLog.scan_unique_id == scan_id).all()
]
def get_options_by_scan_id(scan_id):
"""
select all stored options of the scan by scan id hash
Args:
scan_id: scan id hash
Returns:
an array with a dict with stored options or an empty array
"""
session = create_connection()
if isinstance(session, tuple):
connection, cursor = session
cursor.execute(
"""
SELECT options from reports
WHERE scan_unique_id = ?
""",
(scan_id,),
)
rows = cursor.fetchall()
cursor.close()
connection.close()
if rows:
return [{"options": row[0]} for row in rows]
else:
return [
{"options": log.options}
for log in session.query(Report).filter(Report.scan_unique_id == scan_id).all()
]
def logs_to_report_json(target):
"""
select all reports of a host
Args:
host: the host to search
Returns:
an array with JSON events or an empty array
"""
try:
session = create_connection()
if isinstance(session, tuple):
connection, cursor = session
return_logs = []
cursor.execute(
"""
SELECT scan_unique_id, target, port, event, json_event
FROM scan_events WHERE target = ?
""",
(target,),
)
rows = cursor.fetchall()
cursor.close()
connection.close()
if rows:
for log in rows:
data = {
"scan_id": log[0],
"target": log[1],
"port": json.loads(log[2]),
"event": json.loads(log[3]),
"json_event": json.loads(log[4]),
}
return_logs.append(data)
return return_logs
else:
return_logs = []
logs = session.query(HostsLog).filter(HostsLog.target == target)
for log in logs:
data = {
"scan_id": log.scan_unique_id,
"target": log.target,
"port": json.loads(log.port),
"event": json.loads(log.event),
"json_event": json.loads(log.json_event),
}
return_logs.append(data)
return return_logs
except Exception:
return []
def logs_to_report_html(target):
"""
generate HTML report with d3_tree_v2_graph for a host
Args:
target: the target
Returns:
HTML report
"""
from nettacker.core.graph import build_graph
from nettacker.lib.html_log import log_data
session = create_connection()
if isinstance(session, tuple):
connection, cursor = session
cursor.execute(
"""
SELECT date, target, module_name, scan_unique_id, port, event, json_event
FROM scan_events
WHERE target = ?
""",
(target,),
)
rows = cursor.fetchall()
cursor.close()
connection.close()
logs = [
{
"date": log[0],
"target": log[1],
"module_name": log[2],
"scan_id": log[3],
"port": log[4],
"event": log[5],
"json_event": log[6],
}
for log in rows
]
html_graph = build_graph("d3_tree_v2_graph", logs)
html_content = log_data.table_title.format(
html_graph,
log_data.css_1,
"date",
"target",
"module_name",
"scan_id",
"port",
"event",
"json_event",
)
for event in logs:
html_content += log_data.table_items.format(
event["date"],
event["target"],
event["module_name"],
event["scan_id"],
event["port"],
event["event"],
event["json_event"],
)
html_content += (
log_data.table_end + '<p class="footer">' + messages("nettacker_report") + "</p>"
)
return html_content
else:
logs = [
{
"date": log.date,
"target": log.target,
"module_name": log.module_name,
"scan_id": log.scan_unique_id,
"port": log.port,
"event": log.event,
"json_event": log.json_event,
}
for log in session.query(HostsLog).filter(HostsLog.target == target).all()
]
html_graph = build_graph("d3_tree_v2_graph", logs)
html_content = log_data.table_title.format(
html_graph,
log_data.css_1,
"date",
"target",
"module_name",
"scan_id",
"port",
"event",
"json_event",
)
for event in logs:
html_content += log_data.table_items.format(
event["date"],
event["target"],
event["module_name"],
event["scan_id"],
event["port"],
event["event"],
event["json_event"],
)
html_content += (
log_data.table_end + '<p class="footer">' + messages("nettacker_report") + "</p>"
)
return html_content
def search_logs(page, query):
"""
search in events (host, date, port, module, category, description,
username, password, scan_id, scan_cmd)
Args:
page: page number
query: query to search
Returns:
an array with JSON structure of founded events or an empty array
"""
selected = []
session = create_connection()
if isinstance(session, tuple):
connection, cursor = session
try:
# Fetch targets matching the query
cursor.execute(
"""
SELECT DISTINCT target FROM scan_events
WHERE target LIKE ? OR date LIKE ? OR module_name LIKE ?
OR port LIKE ? OR event LIKE ? OR scan_unique_id LIKE ?
ORDER BY id DESC
LIMIT 10 OFFSET ?
""",
(
f"%{query}%",
f"%{query}%",
f"%{query}%",
f"%{query}%",
f"%{query}%",
f"%{query}%",
(page * 10) - 10,
),
)
targets = cursor.fetchall()
for target_row in targets:
target = target_row[0]
# Fetch data for each target grouped by key fields
cursor.execute(
"""
SELECT date, module_name, port, event, json_event FROM scan_events
WHERE target = ?
GROUP BY module_name, port, scan_unique_id, event
ORDER BY id DESC
""",
(target,),
)
results = cursor.fetchall()
tmp = {
"target": target,
"info": {
"module_name": [],
"port": [],
"date": [],
"event": [],
"json_event": [],
},
}
for data in results:
date, module_name, port, event, json_event = data
if module_name not in tmp["info"]["module_name"]:
tmp["info"]["module_name"].append(module_name)
if date not in tmp["info"]["date"]:
tmp["info"]["date"].append(date)
parsed_port = json.loads(port)
if parsed_port not in tmp["info"]["port"]:
tmp["info"]["port"].append(parsed_port)
parsed_event = json.loads(event)
if parsed_event not in tmp["info"]["event"]:
tmp["info"]["event"].append(parsed_event)
parsed_json_event = json.loads(json_event)
if parsed_json_event not in tmp["info"]["json_event"]:
tmp["info"]["json_event"].append(parsed_json_event)
selected.append(tmp)
cursor.close()
connection.close()
except Exception:
return structure(status="error", msg="database error!")
if len(selected) == 0:
return structure(status="finished", msg="No more search results")
return selected
else:
try:
for host in (
session.query(HostsLog)
.filter(
(HostsLog.target.like("%" + str(query) + "%"))
| (HostsLog.date.like("%" + str(query) + "%"))
| (HostsLog.module_name.like("%" + str(query) + "%"))
| (HostsLog.port.like("%" + str(query) + "%"))
| (HostsLog.event.like("%" + str(query) + "%"))
| (HostsLog.scan_unique_id.like("%" + str(query) + "%"))
)
.group_by(HostsLog.target)
.order_by(HostsLog.id.desc())
.offset((page * 10) - 10)
.limit(10)
):
for data in (
session.query(HostsLog)
.filter(HostsLog.target == str(host.target))
.group_by(
HostsLog.module_name,
HostsLog.port,
HostsLog.scan_unique_id,
HostsLog.event,
)
.order_by(HostsLog.id.desc())
.all()
):
n = 0
capture = None
for selected_data in selected:
if selected_data["target"] == host.target:
capture = n
n += 1
if capture is None:
tmp = {
"target": data.target,
"info": {
"module_name": [],
"port": [],
"date": [],
"event": [],
"json_event": [],
},
}
selected.append(tmp)
n = 0
for selected_data in selected:
if selected_data["target"] == host.target:
capture = n
n += 1
if data.target == selected[capture]["target"]:
if data.module_name not in selected[capture]["info"]["module_name"]:
selected[capture]["info"]["module_name"].append(data.module_name)
if data.date not in selected[capture]["info"]["date"]:
selected[capture]["info"]["date"].append(data.date)
if data.port not in selected[capture]["info"]["port"]:
selected[capture]["info"]["port"].append(json.loads(data.port))
if data.event not in selected[capture]["info"]["event"]:
selected[capture]["info"]["event"].append(json.loads(data.event))
if data.json_event not in selected[capture]["info"]["json_event"]:
selected[capture]["info"]["json_event"].append(
json.loads(data.json_event)
)
except Exception:
return structure(status="error", msg="database error!")
if len(selected) == 0:
return structure(status="finished", msg="No more search results")
return selected