Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ All notable changes to the **Prowler API** are documented in this file.

## [1.25.3] (Prowler v5.24.3)

### 🚀 Added

- `/overviews/findings`, `/overviews/findings-severity` and `/overviews/services` now reflect newly-muted findings without waiting for the next scan. The post-mute `reaggregate-all-finding-group-summaries` task was extended to re-run the same per-scan pipeline that scan completion runs (`ScanSummary`, `DailySeveritySummary`, `FindingGroupDailySummary`) on the latest scan of every `(provider, day)` pair, keeping the pre-aggregated tables in sync with `Finding.muted` updates [(#10827)](https://github.com/prowler-cloud/prowler/pull/10827)

### 🐞 Fixed

- Finding groups aggregated `status` now treats muted findings as resolved: a group is `FAIL` only while at least one non-muted FAIL remains, otherwise it is `PASS` (including fully-muted groups). The `filter[status]` filter and the `sort=status` ordering share the same semantics, keeping `status` consistent with `fail_count` and the orthogonal `muted` flag [(#10825)](https://github.com/prowler-cloud/prowler/pull/10825)
- `aggregate_findings` is now idempotent: it deletes the scan's existing `ScanSummary` rows before `bulk_create`, so re-runs (such as the post-mute reaggregation pipeline) no longer violate the `unique_scan_summary` constraint and no longer abort the downstream `DailySeveritySummary` / `FindingGroupDailySummary` recomputation for the affected scan [(#10827)](https://github.com/prowler-cloud/prowler/pull/10827)

---

Expand Down
3 changes: 3 additions & 0 deletions api/src/backend/tasks/jobs/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,9 @@ def aggregate_findings(tenant_id: str, scan_id: str):
)
for agg in aggregation
}
# Delete first so re-runs (e.g. post-mute reaggregation) don't hit
# the `unique_scan_summary` constraint.
ScanSummary.objects.filter(tenant_id=tenant_id, scan_id=scan_id).delete()
ScanSummary.objects.bulk_create(scan_aggregations, batch_size=3000)


Expand Down
36 changes: 27 additions & 9 deletions api/src/backend/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,15 +771,22 @@ def aggregate_finding_group_summaries_task(tenant_id: str, scan_id: str):
)
@set_tenant(keep_tenant=True)
def reaggregate_all_finding_group_summaries_task(tenant_id: str):
"""Reaggregate finding group summaries for every (provider, day) combination.
"""Reaggregate every pre-aggregated summary table for this tenant.

Mirrors the unbounded scope of `mute_historical_findings_task`: that task
rewrites every Finding row whose UID matches a mute rule, with no time
limit. To keep the daily summaries consistent with that update, this task
re-runs the aggregator on the latest completed scan of every (provider,
day) pair that exists in the database. Tasks are dispatched in parallel
via a Celery group so the wallclock scales with the worker pool, not with
the number of pairs.
limit. To keep the pre-aggregated tables consistent with that update,
this task re-runs the same per-scan aggregation pipeline that scan
completion runs on the latest completed scan of every (provider, day)
pair, rebuilding the three tables that power the read endpoints:

- `ScanSummary` and `DailySeveritySummary` -> `/overviews/findings`,
`/overviews/findings-severity`, `/overviews/services`.
- `FindingGroupDailySummary` -> `/finding-groups` and
`/finding-groups/latest`.

Per-scan pipelines are dispatched in parallel via a Celery group so
wallclock scales with the worker pool.
"""
completed_scans = list(
Scan.objects.filter(
Expand All @@ -804,12 +811,23 @@ def reaggregate_all_finding_group_summaries_task(tenant_id: str):
scan_ids = list(latest_scans.values())
if scan_ids:
logger.info(
"Reaggregating finding group summaries for %d scans (provider x day)",
"Reaggregating overview/finding summaries for %d scans (provider x day)",
len(scan_ids),
)
# DailySeveritySummary reads from ScanSummary, so ScanSummary must be
# recomputed first; FindingGroupDailySummary reads from Finding
# directly and can run in parallel with the severity step.
group(
aggregate_finding_group_summaries_task.si(
tenant_id=tenant_id, scan_id=scan_id
chain(
perform_scan_summary_task.si(tenant_id=tenant_id, scan_id=scan_id),
group(
aggregate_daily_severity_task.si(
tenant_id=tenant_id, scan_id=scan_id
),
aggregate_finding_group_summaries_task.si(
tenant_id=tenant_id, scan_id=scan_id
),
),
)
for scan_id in scan_ids
).apply_async()
Expand Down
59 changes: 59 additions & 0 deletions api/src/backend/tasks/tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
Provider,
Resource,
Scan,
ScanSummary,
StateChoices,
StatusChoices,
)
Expand Down Expand Up @@ -3358,6 +3359,64 @@ def test_aggregate_findings_groups_by_dimensions(
regions = {s.region for s in summaries}
assert regions == {"us-east-1", "us-west-2"}

def test_aggregate_findings_is_idempotent_on_rerun(
self,
tenants_fixture,
scans_fixture,
findings_fixture,
):
"""Re-running `aggregate_findings` for the same scan must not violate
the `unique_scan_summary` constraint, and the resulting row set for
the scan must match the single-run output. This is exercised by the
post-mute reaggregation pipeline, which re-dispatches
`perform_scan_summary_task` against scans whose summaries already
exist."""
tenant = tenants_fixture[0]
scan = scans_fixture[0]

aggregate_findings(str(tenant.id), str(scan.id))
first_run_ids = set(
ScanSummary.all_objects.filter(
tenant_id=tenant.id, scan_id=scan.id
).values_list("id", flat=True)
)
first_run_rows = list(
ScanSummary.all_objects.filter(tenant_id=tenant.id, scan_id=scan.id).values(
"check_id",
"service",
"severity",
"region",
"fail",
"_pass",
"muted",
"total",
)
)

# Second invocation must not raise and must replace the rows without
# leaving duplicates behind.
aggregate_findings(str(tenant.id), str(scan.id))
second_run_ids = set(
ScanSummary.all_objects.filter(
tenant_id=tenant.id, scan_id=scan.id
).values_list("id", flat=True)
)
second_run_rows = list(
ScanSummary.all_objects.filter(tenant_id=tenant.id, scan_id=scan.id).values(
"check_id",
"service",
"severity",
"region",
"fail",
"_pass",
"muted",
"total",
)
)

assert second_run_rows == first_run_rows
assert first_run_ids.isdisjoint(second_run_ids)


@pytest.mark.django_db
class TestAggregateFindingsByRegion:
Expand Down
88 changes: 66 additions & 22 deletions api/src/backend/tasks/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2359,11 +2359,20 @@ class TestReaggregateAllFindingGroupSummaries:
def setup_method(self):
self.tenant_id = str(uuid.uuid4())

@patch("tasks.tasks.chain")
@patch("tasks.tasks.group")
@patch("tasks.tasks.aggregate_finding_group_summaries_task")
@patch("tasks.tasks.aggregate_daily_severity_task")
@patch("tasks.tasks.perform_scan_summary_task")
@patch("tasks.tasks.Scan.objects.filter")
def test_dispatches_subtasks_for_each_provider_per_day(
self, mock_scan_filter, mock_agg_task, mock_group
self,
mock_scan_filter,
mock_scan_summary_task,
mock_daily_severity_task,
mock_finding_group_task,
mock_group,
mock_chain,
):
provider_id_1 = uuid.uuid4()
provider_id_2 = uuid.uuid4()
Expand All @@ -2373,8 +2382,13 @@ def test_dispatches_subtasks_for_each_provider_per_day(
today = datetime.now(tz=timezone.utc)
yesterday = today - timedelta(days=1)

mock_group_result = MagicMock()
mock_group.side_effect = lambda gen: (list(gen), mock_group_result)[1]
mock_outer_group_result = MagicMock()
# The first `group()` call wraps the inner (severity, finding-group)
# parallel step; subsequent calls wrap the outer per-scan generator.
mock_group.side_effect = lambda *args, **kwargs: (
list(args[0]) if args and hasattr(args[0], "__iter__") else None,
mock_outer_group_result,
)[1]

mock_scan_filter.return_value.order_by.return_value.values.return_value = [
{
Expand All @@ -2397,23 +2411,40 @@ def test_dispatches_subtasks_for_each_provider_per_day(
result = reaggregate_all_finding_group_summaries_task(tenant_id=self.tenant_id)

assert result == {"scans_reaggregated": 3}
assert mock_agg_task.si.call_count == 3
mock_agg_task.si.assert_any_call(
tenant_id=self.tenant_id, scan_id=str(scan_id_today_p1)
)
mock_agg_task.si.assert_any_call(
tenant_id=self.tenant_id, scan_id=str(scan_id_today_p2)
)
mock_agg_task.si.assert_any_call(
tenant_id=self.tenant_id, scan_id=str(scan_id_yesterday_p1)
)
mock_group_result.apply_async.assert_called_once()
expected_scan_ids = {
str(scan_id_today_p1),
str(scan_id_today_p2),
str(scan_id_yesterday_p1),
}
for task_mock in (
mock_scan_summary_task,
mock_daily_severity_task,
mock_finding_group_task,
):
assert task_mock.si.call_count == 3
dispatched = {
call.kwargs["scan_id"] for call in task_mock.si.call_args_list
}
assert dispatched == expected_scan_ids
for call in task_mock.si.call_args_list:
assert call.kwargs["tenant_id"] == self.tenant_id
assert mock_chain.call_count == 3
mock_outer_group_result.apply_async.assert_called_once()

@patch("tasks.tasks.chain")
@patch("tasks.tasks.group")
@patch("tasks.tasks.aggregate_finding_group_summaries_task")
@patch("tasks.tasks.aggregate_daily_severity_task")
@patch("tasks.tasks.perform_scan_summary_task")
@patch("tasks.tasks.Scan.objects.filter")
def test_dedupes_scans_to_latest_per_provider_per_day(
self, mock_scan_filter, mock_agg_task, mock_group
self,
mock_scan_filter,
mock_scan_summary_task,
mock_daily_severity_task,
mock_finding_group_task,
mock_group,
mock_chain,
):
"""When several scans run on the same day for the same provider, only
the latest one is dispatched (matching the daily summary unique key)."""
Expand All @@ -2423,8 +2454,11 @@ def test_dedupes_scans_to_latest_per_provider_per_day(
today_late = datetime.now(tz=timezone.utc)
today_early = today_late - timedelta(hours=4)

mock_group_result = MagicMock()
mock_group.side_effect = lambda gen: (list(gen), mock_group_result)[1]
mock_outer_group_result = MagicMock()
mock_group.side_effect = lambda *args, **kwargs: (
list(args[0]) if args and hasattr(args[0], "__iter__") else None,
mock_outer_group_result,
)[1]

# Returned ordered by `-completed_at`, so the most recent comes first.
mock_scan_filter.return_value.order_by.return_value.values.return_value = [
Expand All @@ -2443,17 +2477,27 @@ def test_dedupes_scans_to_latest_per_provider_per_day(
result = reaggregate_all_finding_group_summaries_task(tenant_id=self.tenant_id)

assert result == {"scans_reaggregated": 1}
mock_agg_task.si.assert_called_once_with(
tenant_id=self.tenant_id, scan_id=str(latest_scan_today)
)
mock_group_result.apply_async.assert_called_once()
for task_mock in (
mock_scan_summary_task,
mock_daily_severity_task,
mock_finding_group_task,
):
task_mock.si.assert_called_once_with(
tenant_id=self.tenant_id, scan_id=str(latest_scan_today)
)
mock_chain.assert_called_once()
mock_outer_group_result.apply_async.assert_called_once()

@patch("tasks.tasks.chain")
@patch("tasks.tasks.group")
@patch("tasks.tasks.Scan.objects.filter")
def test_no_completed_scans_skips_dispatch(self, mock_scan_filter, mock_group):
def test_no_completed_scans_skips_dispatch(
self, mock_scan_filter, mock_group, mock_chain
):
mock_scan_filter.return_value.order_by.return_value.values.return_value = []

result = reaggregate_all_finding_group_summaries_task(tenant_id=self.tenant_id)

assert result == {"scans_reaggregated": 0}
mock_group.assert_not_called()
mock_chain.assert_not_called()
Loading