Skip to content

Create stats compare endpoint to compare two summaries CRASM-2584 #947

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
382 changes: 382 additions & 0 deletions backend/src/xfd_django/xfd_api/api_methods/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

# Third-Party Libraries
from django.forms.models import model_to_dict
from django.utils.timezone import now
from fastapi import HTTPException, Request
from redis import asyncio as aioredis
from xfd_api.auth import get_stats_org_ids
Expand Down Expand Up @@ -593,3 +594,384 @@ def fetch_summaries(model, exclude_fields=None):
"port_scan_service_summaries": port_services_dicts,
"vuln_scan_summaries": vuln_scan_summaries,
}


SUMMARY_CONFIG = {
"host": {
"model": HostSummary,
"fields": [
"host_done_count",
"host_waiting_count",
"host_running_count",
"host_ready_count",
"up_host_count",
"down_host_count",
"scanned_asset_count",
],
},
"port": {
"model": PortScanSummary,
"fields": [
"open_port_count",
"risky_port_count",
"nmi_service_count",
"unique_ip_count",
"unique_service_count",
],
"complex_fields": ["risky_service_group_counts"],
},
"vs": {
"model": VulnScanSummary,
"fields": [
"assets_owned_count",
"false_positive_count",
"vulnerable_host_count",
"unique_service_count",
"unique_low_severity_count",
"unique_medium_severity_count",
"unique_high_severity_count",
"unique_critical_severity_count",
"risky_services_count",
"unsupported_software_count",
"unique_os_count",
"low_severity_count",
"medium_severity_count",
"high_severity_count",
"critical_severity_count",
"critical_max_age",
"high_max_age",
"low_kev_count",
"medium_kev_count",
"high_kev_count",
"critical_kev_count",
"kev_max_age",
"one_to_five_vulns_count",
"six_to_nine_vulns_count",
"ten_plus_vulns_count",
],
"complex_fields": [
"included_tickets",
"top_5_occurring_cves",
"top_5_occurring_kevs",
"top_5_risky_hosts",
],
},
}


def get_summary_dict(summary, numeric_fields, complex_fields=None):
"""Convert the summary to a returnable dictionary format."""
if not summary:
return None
data = {field: getattr(summary, field, None) for field in numeric_fields}

if complex_fields:
for field in complex_fields:
data[field] = getattr(summary, field, None)

# Always include summary_date, start_date, and end_date if present
for date_field in ["summary_date", "start_date", "end_date"]:
if hasattr(summary, date_field):
data[date_field] = getattr(summary, date_field)

return data


def compute_deltas(
base_summary, compare_summary, fields, base_missing=False, compare_missing=False
):
"""Calculate the differences between the two summaries metric fields."""
delta = {}
for field in fields:
base_val = getattr(base_summary, field, 0) if base_summary else 0
compare_val = getattr(compare_summary, field, 0) if compare_summary else 0

base_val = base_val or 0
compare_val = compare_val or 0
count_change = compare_val - base_val

if base_missing:
percent_change = 100.0 if compare_val > 0 else 0.0
note = "Base summary not found."
elif compare_missing:
percent_change = None
note = "Compare summary not found."
elif base_val == 0 and compare_val == 0:
percent_change = 0.0
note = None
elif base_val == 0:
percent_change = 100.0
note = "Base value was 0; assumed 100% increase."
else:
percent_change = (count_change / base_val) * 100
note = None

delta[field] = {
"count_change": count_change,
"percent_change": round(percent_change, 2)
if percent_change is not None
else None,
}

if note:
delta[field]["note"] = note

return delta


def compare_risky_service_groups(base_dict, compare_dict):
"""Compare risky service group counts between two summaries."""
base_dict = base_dict or {}
compare_dict = compare_dict or {}

all_keys = set(base_dict.keys()) | set(compare_dict.keys())
result = {}

for key in all_keys:
base_val = base_dict.get(key, 0)
compare_val = compare_dict.get(key, 0)
count_change = compare_val - base_val

if base_val == 0 and compare_val > 0:
percent_change = 100.0
note = "Base value was 0; assumed 100% increase."
elif base_val == 0 and compare_val == 0:
percent_change = 0.0
note = None
else:
percent_change = (count_change / base_val) * 100
note = None

result[key] = {
"base": base_val,
"compare": compare_val,
"count_change": count_change,
"percent_change": round(percent_change, 2),
}

if note:
result[key]["note"] = note

return result


def compare_included_tickets(base_tickets, compare_tickets):
"""Identify open and closed tickets between the two summaries."""
base_tickets = base_tickets or {}
compare_tickets = compare_tickets or {}

base_ids = set(base_tickets.keys())
compare_ids = set(compare_tickets.keys())

new_ids = compare_ids - base_ids
closed_ids = base_ids - compare_ids

severity_levels = ["none", "low", "medium", "high", "critical"]

def init_counts():
"""Inititialize severity count dictionaries."""
return {sev: 0 for sev in severity_levels}

def count_severities(ticket_dict):
"""Count severities in the ticket dictionaries."""
counts = init_counts()
for ticket in ticket_dict.values():
severity = ticket.get("severity", "none").lower()
if severity in counts:
counts[severity] += 1
return counts

def compute_counts(ticket_ids, ticket_dict, reference_counts):
"""Calculate the counts of the ticket severities."""
by_severity = init_counts()
kev_by_severity = init_counts()
kev_count = 0

for tid in ticket_ids:
ticket = ticket_dict.get(tid, {})
severity = ticket.get("severity", "none").lower()
is_kev = ticket.get("is_kev", False)

if severity in by_severity:
by_severity[severity] += 1
if is_kev:
kev_by_severity[severity] += 1
if is_kev:
kev_count += 1

total = len(ticket_ids)

by_severity_percent = {}
for sev in severity_levels:
ref_total = reference_counts.get(sev, 0)
by_severity_percent[sev] = (
round((by_severity[sev] / ref_total) * 100, 2) if ref_total else 0.0
)

return {
"total_count": total,
"by_severity": by_severity,
"by_severity_percent": by_severity_percent,
"kev_count": kev_count,
"kev_by_severity": kev_by_severity,
}

# Base severity counts for denominator reference
base_severity_counts = count_severities(base_tickets)
compare_severity_counts = count_severities(compare_tickets)

new_data = compute_counts(new_ids, compare_tickets, compare_severity_counts)
closed_data = compute_counts(closed_ids, base_tickets, base_severity_counts)

total_compare = len(compare_ids)
total_base = len(base_ids)

new_data["total_percent"] = (
round((new_data["total_count"] / total_compare) * 100, 2)
if total_compare
else 0.0
)
closed_data["total_percent"] = (
round((closed_data["total_count"] / total_base) * 100, 2) if total_base else 0.0
)

return {"new": new_data, "closed": closed_data, "note": None}


def get_summary_comparison(
summary_type, organization, base_date, compare_date, enhanced_data=True
):
"""Return the Summary comparison dictionary."""
config = SUMMARY_CONFIG[summary_type]
model = config["model"]
fields = config["fields"]

base_summary = None
compare_summary = None
base_missing = False
compare_missing = False

try:
base_summary = model.objects.get(
organization=organization, summary_date=base_date
)
except model.DoesNotExist:
base_missing = True

if compare_date:
try:
compare_summary = model.objects.get(
organization=organization, summary_date=compare_date
)
except model.DoesNotExist:
compare_missing = True
else:
compare_summary = (
model.objects.filter(
organization=organization, summary_date__lte=now().date()
)
.order_by("-summary_date")
.first()
)
if not compare_summary:
compare_missing = True

if base_missing and compare_missing:
return None

complex_fields = config.get("complex_fields", []) if enhanced_data else []

result = {
"base_summary": get_summary_dict(base_summary, fields, complex_fields)
if base_summary
else None,
"compare_summary": get_summary_dict(compare_summary, fields, complex_fields)
if compare_summary
else None,
"delta": compute_deltas(
base_summary, compare_summary, fields, base_missing, compare_missing
),
}

# Add included_tickets detailed comparison ONLY for vuln summary
if summary_type == "vs":
included_tickets_base = (
getattr(base_summary, "included_tickets", None) if base_summary else {}
)
included_tickets_compare = (
getattr(compare_summary, "included_tickets", None)
if compare_summary
else {}
)

result["included_tickets_comparison"] = compare_included_tickets(
included_tickets_base, included_tickets_compare
)

if summary_type == "port":
base_group_counts = (
getattr(base_summary, "risky_service_group_counts", None)
if base_summary
else {}
)
compare_group_counts = (
getattr(compare_summary, "risky_service_group_counts", None)
if compare_summary
else {}
)

result["risky_service_group_comparison"] = compare_risky_service_groups(
base_group_counts, compare_group_counts
)

return result


def get_stats_comparison_data(filters, current_user):
"""Calculate a comparison of two requested comparisons."""
organization_id = filters.organization_id

if not is_valid_uuid(organization_id):
raise HTTPException(status_code=404, detail="Invalid organization ID.")

try:
organization = Organization.objects.get(id=organization_id)
except Organization.DoesNotExist:
raise HTTPException(status_code=404, detail="Organization not found.")

if (
not is_global_view_admin(current_user)
and not current_user.user_type == "regionalAdmin"
):
org_ids = get_org_memberships(current_user)
if uuid.UUID(organization_id) not in org_ids:
raise HTTPException(
status_code=404, detail="Access denied to requested organization."
) # User has no accessible organizations

# Regional Admins can only view vulnerabilities in their region
if current_user.user_type == "regionalAdmin" and current_user.region_id:
if organization.region_id != current_user.region_id:
raise HTTPException(
status_code=404, detail="Access denied to requested organization."
)

base_date = filters.base_date
compare_date = filters.compare_date
sources = filters.sources
enhanced_data = filters.enhanced_data if hasattr(filters, "enhanced_data") else True

results = {}

for source in sources:
if source not in SUMMARY_CONFIG:
continue

summary_data = get_summary_comparison(
source, organization, base_date, compare_date, enhanced_data=enhanced_data
)

if summary_data is not None:
results[f"{source}_scans"] = summary_data

return results
Loading
Loading