diff --git a/api/CHANGELOG.md b/api/CHANGELOG.md index 213ee215de8..aaadde12780 100644 --- a/api/CHANGELOG.md +++ b/api/CHANGELOG.md @@ -4,9 +4,14 @@ All notable changes to the **Prowler API** are documented in this file. ## [1.25.3] (Prowler v5.24.3) +### 🚀 Added + +- `/overviews/findings`, `/overviews/findings-severity` and `/overviews/services` now reflect newly-muted findings without waiting for the next scan. The post-mute `reaggregate-all-finding-group-summaries` task was extended to re-run the same per-scan pipeline that scan completion runs (`ScanSummary`, `DailySeveritySummary`, `FindingGroupDailySummary`) on the latest scan of every `(provider, day)` pair, keeping the pre-aggregated tables in sync with `Finding.muted` updates [(#10827)](https://github.com/prowler-cloud/prowler/pull/10827) + ### 🐞 Fixed - Finding groups aggregated `status` now treats muted findings as resolved: a group is `FAIL` only while at least one non-muted FAIL remains, otherwise it is `PASS` (including fully-muted groups). The `filter[status]` filter and the `sort=status` ordering share the same semantics, keeping `status` consistent with `fail_count` and the orthogonal `muted` flag [(#10825)](https://github.com/prowler-cloud/prowler/pull/10825) +- `aggregate_findings` is now idempotent: it deletes the scan's existing `ScanSummary` rows before `bulk_create`, so re-runs (such as the post-mute reaggregation pipeline) no longer violate the `unique_scan_summary` constraint and no longer abort the downstream `DailySeveritySummary` / `FindingGroupDailySummary` recomputation for the affected scan [(#10827)](https://github.com/prowler-cloud/prowler/pull/10827) --- diff --git a/api/src/backend/tasks/jobs/scan.py b/api/src/backend/tasks/jobs/scan.py index 456fd786cb8..a501ba28b0d 100644 --- a/api/src/backend/tasks/jobs/scan.py +++ b/api/src/backend/tasks/jobs/scan.py @@ -1198,6 +1198,9 @@ def aggregate_findings(tenant_id: str, scan_id: str): ) for agg in aggregation } + # Delete first so re-runs (e.g. post-mute reaggregation) don't hit + # the `unique_scan_summary` constraint. + ScanSummary.objects.filter(tenant_id=tenant_id, scan_id=scan_id).delete() ScanSummary.objects.bulk_create(scan_aggregations, batch_size=3000) diff --git a/api/src/backend/tasks/tasks.py b/api/src/backend/tasks/tasks.py index bbf4d897723..b4a8f91668f 100644 --- a/api/src/backend/tasks/tasks.py +++ b/api/src/backend/tasks/tasks.py @@ -771,15 +771,22 @@ def aggregate_finding_group_summaries_task(tenant_id: str, scan_id: str): ) @set_tenant(keep_tenant=True) def reaggregate_all_finding_group_summaries_task(tenant_id: str): - """Reaggregate finding group summaries for every (provider, day) combination. + """Reaggregate every pre-aggregated summary table for this tenant. Mirrors the unbounded scope of `mute_historical_findings_task`: that task rewrites every Finding row whose UID matches a mute rule, with no time - limit. To keep the daily summaries consistent with that update, this task - re-runs the aggregator on the latest completed scan of every (provider, - day) pair that exists in the database. Tasks are dispatched in parallel - via a Celery group so the wallclock scales with the worker pool, not with - the number of pairs. + limit. To keep the pre-aggregated tables consistent with that update, + this task re-runs the same per-scan aggregation pipeline that scan + completion runs on the latest completed scan of every (provider, day) + pair, rebuilding the three tables that power the read endpoints: + + - `ScanSummary` and `DailySeveritySummary` -> `/overviews/findings`, + `/overviews/findings-severity`, `/overviews/services`. + - `FindingGroupDailySummary` -> `/finding-groups` and + `/finding-groups/latest`. + + Per-scan pipelines are dispatched in parallel via a Celery group so + wallclock scales with the worker pool. """ completed_scans = list( Scan.objects.filter( @@ -804,12 +811,23 @@ def reaggregate_all_finding_group_summaries_task(tenant_id: str): scan_ids = list(latest_scans.values()) if scan_ids: logger.info( - "Reaggregating finding group summaries for %d scans (provider x day)", + "Reaggregating overview/finding summaries for %d scans (provider x day)", len(scan_ids), ) + # DailySeveritySummary reads from ScanSummary, so ScanSummary must be + # recomputed first; FindingGroupDailySummary reads from Finding + # directly and can run in parallel with the severity step. group( - aggregate_finding_group_summaries_task.si( - tenant_id=tenant_id, scan_id=scan_id + chain( + perform_scan_summary_task.si(tenant_id=tenant_id, scan_id=scan_id), + group( + aggregate_daily_severity_task.si( + tenant_id=tenant_id, scan_id=scan_id + ), + aggregate_finding_group_summaries_task.si( + tenant_id=tenant_id, scan_id=scan_id + ), + ), ) for scan_id in scan_ids ).apply_async() diff --git a/api/src/backend/tasks/tests/test_scan.py b/api/src/backend/tasks/tests/test_scan.py index 716033684b4..d176929105f 100644 --- a/api/src/backend/tasks/tests/test_scan.py +++ b/api/src/backend/tasks/tests/test_scan.py @@ -36,6 +36,7 @@ Provider, Resource, Scan, + ScanSummary, StateChoices, StatusChoices, ) @@ -3358,6 +3359,64 @@ def test_aggregate_findings_groups_by_dimensions( regions = {s.region for s in summaries} assert regions == {"us-east-1", "us-west-2"} + def test_aggregate_findings_is_idempotent_on_rerun( + self, + tenants_fixture, + scans_fixture, + findings_fixture, + ): + """Re-running `aggregate_findings` for the same scan must not violate + the `unique_scan_summary` constraint, and the resulting row set for + the scan must match the single-run output. This is exercised by the + post-mute reaggregation pipeline, which re-dispatches + `perform_scan_summary_task` against scans whose summaries already + exist.""" + tenant = tenants_fixture[0] + scan = scans_fixture[0] + + aggregate_findings(str(tenant.id), str(scan.id)) + first_run_ids = set( + ScanSummary.all_objects.filter( + tenant_id=tenant.id, scan_id=scan.id + ).values_list("id", flat=True) + ) + first_run_rows = list( + ScanSummary.all_objects.filter(tenant_id=tenant.id, scan_id=scan.id).values( + "check_id", + "service", + "severity", + "region", + "fail", + "_pass", + "muted", + "total", + ) + ) + + # Second invocation must not raise and must replace the rows without + # leaving duplicates behind. + aggregate_findings(str(tenant.id), str(scan.id)) + second_run_ids = set( + ScanSummary.all_objects.filter( + tenant_id=tenant.id, scan_id=scan.id + ).values_list("id", flat=True) + ) + second_run_rows = list( + ScanSummary.all_objects.filter(tenant_id=tenant.id, scan_id=scan.id).values( + "check_id", + "service", + "severity", + "region", + "fail", + "_pass", + "muted", + "total", + ) + ) + + assert second_run_rows == first_run_rows + assert first_run_ids.isdisjoint(second_run_ids) + @pytest.mark.django_db class TestAggregateFindingsByRegion: diff --git a/api/src/backend/tasks/tests/test_tasks.py b/api/src/backend/tasks/tests/test_tasks.py index 4a4108607ef..f9b581d3f07 100644 --- a/api/src/backend/tasks/tests/test_tasks.py +++ b/api/src/backend/tasks/tests/test_tasks.py @@ -2359,11 +2359,20 @@ class TestReaggregateAllFindingGroupSummaries: def setup_method(self): self.tenant_id = str(uuid.uuid4()) + @patch("tasks.tasks.chain") @patch("tasks.tasks.group") @patch("tasks.tasks.aggregate_finding_group_summaries_task") + @patch("tasks.tasks.aggregate_daily_severity_task") + @patch("tasks.tasks.perform_scan_summary_task") @patch("tasks.tasks.Scan.objects.filter") def test_dispatches_subtasks_for_each_provider_per_day( - self, mock_scan_filter, mock_agg_task, mock_group + self, + mock_scan_filter, + mock_scan_summary_task, + mock_daily_severity_task, + mock_finding_group_task, + mock_group, + mock_chain, ): provider_id_1 = uuid.uuid4() provider_id_2 = uuid.uuid4() @@ -2373,8 +2382,13 @@ def test_dispatches_subtasks_for_each_provider_per_day( today = datetime.now(tz=timezone.utc) yesterday = today - timedelta(days=1) - mock_group_result = MagicMock() - mock_group.side_effect = lambda gen: (list(gen), mock_group_result)[1] + mock_outer_group_result = MagicMock() + # The first `group()` call wraps the inner (severity, finding-group) + # parallel step; subsequent calls wrap the outer per-scan generator. + mock_group.side_effect = lambda *args, **kwargs: ( + list(args[0]) if args and hasattr(args[0], "__iter__") else None, + mock_outer_group_result, + )[1] mock_scan_filter.return_value.order_by.return_value.values.return_value = [ { @@ -2397,23 +2411,40 @@ def test_dispatches_subtasks_for_each_provider_per_day( result = reaggregate_all_finding_group_summaries_task(tenant_id=self.tenant_id) assert result == {"scans_reaggregated": 3} - assert mock_agg_task.si.call_count == 3 - mock_agg_task.si.assert_any_call( - tenant_id=self.tenant_id, scan_id=str(scan_id_today_p1) - ) - mock_agg_task.si.assert_any_call( - tenant_id=self.tenant_id, scan_id=str(scan_id_today_p2) - ) - mock_agg_task.si.assert_any_call( - tenant_id=self.tenant_id, scan_id=str(scan_id_yesterday_p1) - ) - mock_group_result.apply_async.assert_called_once() + expected_scan_ids = { + str(scan_id_today_p1), + str(scan_id_today_p2), + str(scan_id_yesterday_p1), + } + for task_mock in ( + mock_scan_summary_task, + mock_daily_severity_task, + mock_finding_group_task, + ): + assert task_mock.si.call_count == 3 + dispatched = { + call.kwargs["scan_id"] for call in task_mock.si.call_args_list + } + assert dispatched == expected_scan_ids + for call in task_mock.si.call_args_list: + assert call.kwargs["tenant_id"] == self.tenant_id + assert mock_chain.call_count == 3 + mock_outer_group_result.apply_async.assert_called_once() + @patch("tasks.tasks.chain") @patch("tasks.tasks.group") @patch("tasks.tasks.aggregate_finding_group_summaries_task") + @patch("tasks.tasks.aggregate_daily_severity_task") + @patch("tasks.tasks.perform_scan_summary_task") @patch("tasks.tasks.Scan.objects.filter") def test_dedupes_scans_to_latest_per_provider_per_day( - self, mock_scan_filter, mock_agg_task, mock_group + self, + mock_scan_filter, + mock_scan_summary_task, + mock_daily_severity_task, + mock_finding_group_task, + mock_group, + mock_chain, ): """When several scans run on the same day for the same provider, only the latest one is dispatched (matching the daily summary unique key).""" @@ -2423,8 +2454,11 @@ def test_dedupes_scans_to_latest_per_provider_per_day( today_late = datetime.now(tz=timezone.utc) today_early = today_late - timedelta(hours=4) - mock_group_result = MagicMock() - mock_group.side_effect = lambda gen: (list(gen), mock_group_result)[1] + mock_outer_group_result = MagicMock() + mock_group.side_effect = lambda *args, **kwargs: ( + list(args[0]) if args and hasattr(args[0], "__iter__") else None, + mock_outer_group_result, + )[1] # Returned ordered by `-completed_at`, so the most recent comes first. mock_scan_filter.return_value.order_by.return_value.values.return_value = [ @@ -2443,17 +2477,27 @@ def test_dedupes_scans_to_latest_per_provider_per_day( result = reaggregate_all_finding_group_summaries_task(tenant_id=self.tenant_id) assert result == {"scans_reaggregated": 1} - mock_agg_task.si.assert_called_once_with( - tenant_id=self.tenant_id, scan_id=str(latest_scan_today) - ) - mock_group_result.apply_async.assert_called_once() + for task_mock in ( + mock_scan_summary_task, + mock_daily_severity_task, + mock_finding_group_task, + ): + task_mock.si.assert_called_once_with( + tenant_id=self.tenant_id, scan_id=str(latest_scan_today) + ) + mock_chain.assert_called_once() + mock_outer_group_result.apply_async.assert_called_once() + @patch("tasks.tasks.chain") @patch("tasks.tasks.group") @patch("tasks.tasks.Scan.objects.filter") - def test_no_completed_scans_skips_dispatch(self, mock_scan_filter, mock_group): + def test_no_completed_scans_skips_dispatch( + self, mock_scan_filter, mock_group, mock_chain + ): mock_scan_filter.return_value.order_by.return_value.values.return_value = [] result = reaggregate_all_finding_group_summaries_task(tenant_id=self.tenant_id) assert result == {"scans_reaggregated": 0} mock_group.assert_not_called() + mock_chain.assert_not_called()