Server-Sent Events for history + notification updates#22513
Server-Sent Events for history + notification updates#22513mvdbeek wants to merge 38 commits intogalaxyproject:devfrom
Conversation
48f2c36 to
548b1bd
Compare
|
LOL, YESSSSSSS! 🥳 |
3bd1c8d to
f482035
Compare
|
🎉 🎉 🎉 🎉 🎉 🎉 🎉 🎉 🎉 🎉 Thank you for keeping the fallback poll interval. I've tried SSE and, although very rarely, sometimes events were not arriving to some clients. I find it very sane to provide it as fallback, mainly because it is really confusing for the user when it happens. |
e13662a to
039c5fb
Compare
| response = requests.get(urljoin(self.url, "api/users/current"), params={"key": api_key}) | ||
| response.raise_for_status() | ||
| encoded_id = response.json()["id"] | ||
| return self._app.security.decode_id(encoded_id) |
There was a problem hiding this comment.
I would move this to a help in the IntegrationTestCase.
There was a problem hiding this comment.
Not sure I'll keep this test case, it seem like something we can piggy back onto the existing IT tests.
bb7524b to
bf5b7b7
Compare
| # The ``/api/users/current`` endpoint returns the encoded id; decode | ||
| # via the app's security helper so we get the raw int the job row | ||
| # needs. | ||
| import requests |
There was a problem hiding this comment.
And that hasn't gone though gx-arch-review yet ...
|
Took a glance over the code and nothing is too scary other than just the vastness of how much of it I don't understand but wish I did - but I couldn't find anything locking us into problematic interfaces on first pass. I think the piece of documentation that would help me understand the system better is the admin story - how is it configured, how do we decided the one process-per-deployment that does the history audit manager thing (I did read that is one process per deployment right), are we constructing the manager for every deployment and only starting it in one, etc... Well that went from the docs I want to see to the question I had but I'm disorganized. Overall though an exciting feature - we've talked about this for years and it never felt viable to me - this feels possible now and not as lets say intricate or bespoke as I feared it would be. Maybe other nice to haves for feeling better about this would be a like a whole Selenium suite run using this - or... maybe we drop Selenium tests for tests we're already playwright testing and then add a new Playwright suite to the normal CI that runs in this mode. The integration tests in the PR are valuable - they are probably useful and probably will help debug and confirm particular things about the implementation going forward but given that I don't understand the implementation I don't know that give me a lot confidence they ensure the UI will still work - the full suite is what would give me that confidence. |
|
Fully agree with all you said, i do have markdown documents with admin story, architecture, performance concerns, likely breaking points etc.
it should be in production for sure, but that's not there yet. It can be a very lightweight external process or it could live with the config watcher in a first pass. I think 26.1 should probably come with a handful of (optional) new processes (history listener, config watcher, invocation completion monitor) |
|
And yes, full frontend suite should pass at least once, skipping redundant selenium sounds good. Maybe we add a full test run to the release testing CI ? |
79b1287 to
829c4fa
Compare
Add real-time notification delivery via SSE to replace the 30-second polling interval. The SSE endpoint streams notification_update, broadcast_update, and notification_status events to connected clients. Backend: - New SSEConnectionManager (lib/galaxy/managers/sse.py) maps user IDs to asyncio queues with thread-safe push via call_soon_threadsafe - SSE streaming endpoint at GET /api/notifications/stream with Last-Event-ID catch-up support and 30s keepalive - Kombu control tasks (notify_users, notify_broadcast) fan out events across all Galaxy worker processes - Existing polling API unchanged for backward compatibility Frontend: - New useNotificationSSE composable using EventSource with auto-reconnect - notificationsStore tries SSE first, falls back to polling after 5+ consecutive errors Tests: - API integration tests for SSE event delivery, broadcasts, and reconnect - Selenium E2E tests for notification appearance and bell indicator Add SSE-based real-time history update notifications Replace aggressive 3-second history polling with Server-Sent Events driven by database change detection, configurable via admin setting. Backend: - Add pg_notify() to PostgreSQL audit triggers for instant LISTEN/NOTIFY - New HistoryAuditMonitor: PG LISTEN/NOTIFY with SQLite polling fallback - New /api/events/stream SSE endpoint (uses StructuredApp, not MinimalManagerApp) - Kombu control task "history_update" with message TTL (expiration=10s) - Config: enable_sse_history_updates, history_audit_monitor_poll_interval Frontend: - Generalize useNotificationSSE → useSSE composable with event type filtering - historyStore connects SSE for history_update events, triggers immediate refresh - notificationsStore updated to use /api/events/stream and useSSE - Polling kept as fallback at existing intervals Tests: - 5 integration tests: endpoint, dataset upload event, history ID in payload, cross-user isolation, polling backward compatibility
Three related issues uncovered while exercising the SSE push path end-to-end: - Celery workers could not publish SSE control tasks. Move the AMQP connection + publisher-only GalaxyQueueWorker construction up into GalaxyManagerApplication so every manager app gets a publisher, and add an explicit bind_publisher() entry point. Web workers still start a consumer via bind_and_start post-fork. - Under gunicorn --preload, config.server_name is rewritten post-fork (main -> main.1) but the pre-fork publisher bindings were never refreshed, so the consumer listened on control.main@host while producers published to control.main.1@host. bind_and_start now always re-invokes bind_publisher so consumer queues match the post-fork identity. - all_control_queues_for_declare required database_heartbeat, which Celery workers don't run. Query WorkerProcess directly (with an active-window filter), and add a webapp_only flag so SSEEventDispatcher only fans out to processes that actually have browser connections.
The first iteration of SSE-driven history updates had two client bugs: - startWatchingHistoryWithSSE() was called from many places (component mounts, upload/tool/workflow completion hooks) and each call re-opened the EventSource. That flapped `connected` false -> true repeatedly, so the watch that was supposed to stop the 3s poll on SSE-up never had a stable transition and polling kept running. Make SSE init idempotent and drive start/stop of the polling fallback from the connected ref. - watchHistoryOnce short-circuits when lastUpdateTime >= history.update_time, so an SSE push arriving before the client's next poll tick produced no UI change until a full history reload. Add refreshHistoryFromPush() that forces the fetch, skipping both the `since` query param and the update_time gate, and use it from the SSE handler.
SSEEventDispatcher lived in galaxy.managers.sse alongside the pure connection types (SSEEvent, SSEConnectionManager). That forced two local imports inside _send() — send_control_task and all_control_queues_for_declare — because galaxy.queue_worker depends on SSEConnectionManager/SSEEvent from sse.py, so hoisting the queue_worker import to the top of sse.py would create a cycle. Move the dispatcher to galaxy.managers.sse_dispatch. It can then import from galaxy.queue_worker and galaxy.queues at module top: the new module is a leaf with respect to those dependencies (queue_worker never imports it back), so there's no cycle. Callers (app, notification manager, history audit monitor) import SSEEventDispatcher from its new home. Note: the conditional `import psycopg` / `import psycopg2` in history_audit_monitor.py and the feature-flagged HistoryAuditMonitor import in app.py are retained — both are legitimate optional/gated loads, not cycle workarounds.
The SSE history-update pipeline (HistoryAuditMonitor LISTENing on galaxy_history_update) only fires on pg_notify. Fresh installs get the notify-emitting trigger functions from update_audit_table.py, but the last trigger-touching migration (c716ee82337b) re-created the functions without pg_notify, so upgraded databases silently never dispatched events. This revision replaces both audit trigger functions with notify-enabled versions; SQLite is a no-op.
The previous fallback pattern watched EventSource `connected` to switch between SSE and polling. That conflates two unrelated things: network blips (which EventSource auto-recovers from) and deployment config (which decides whether events ever arrive at all). In particular, `/api/events/stream` accepts connections even when HistoryAuditMonitor is disabled, so the watch-based design silently stopped polling and waited forever for events that never came. Gate the choice on the server config flag instead: - Expose `enable_sse_history_updates` via ConfigSerializer so the client can read it alongside `enable_notification_system`. - In historyStore/notificationsStore, read the flag once after config loads: if true, prime an initial fetch and connect SSE; if false, start the resource watcher for polling. No runtime toggle. - Instantiate `useResourceWatcher` lazily inside the polling branch so its `visibilitychange` listener — which would otherwise restart polling on every tab focus — is never registered in SSE mode. - Drop the 5-error permanent-give-up in `useSSE`; with no runtime fallback it would just freeze the client. Rely on EventSource's native Last-Event-ID reconnect. Add Vitest coverage for both stores covering SSE-on and SSE-off scenarios, including a tab-visibility toggle that must not re-arm polling in SSE mode.
wait_for_selector_visible dispatches as a CSS selector on both backends; `text=` is a Playwright engine prefix that Selenium's css-selector path rejects with InvalidSelectorException. wait_for_xpath_visible works on both backends.
cd81543 to
0bec160
Compare
Flip enable_notification_system, enable_sse_history_updates, and enable_sse_entry_point_updates in the Selenium, Playwright, and Integration Selenium workflows so the full UI test surface exercises the SSE pipeline. Revert before merging.
HistoryAuditMonitor only dispatched history_update events via push_to_user, so anonymous-owned histories (user_id IS NULL) never produced events. With enable_sse_history_updates on, the client disables polling and waits for SSE events — leaving anonymous history panels frozen, which broke seven UI tests across the Playwright suites. Extend the pipeline with a parallel galaxy_session.id-keyed route: - SSEConnectionManager tracks a _session_connections map alongside _connections and exposes push_to_session; connect/disconnect/stream accept an optional galaxy_session_id. - EventsService.open_stream forwards trans.galaxy_session.id so anonymous sessions register under their session key. - SSEEventDispatcher.history_update and HistoryUpdatePayload gain an optional session_updates dict; the queue_worker handler fans out session-keyed events via push_to_session. - HistoryAuditMonitor caches (user_id, session_ids) per history and performs one extra indexed lookup against GalaxySessionToHistoryAssociation only for anon histories. galaxy_session.id never leaves the server — it's used only as an in-memory/AMQP dispatch key; the browser-visible event payload still contains just encoded history_ids.
8e72fcb to
4e8dd3c
Compare
…e socket Form-based login / registration / logout all navigate via ``window.location.href``. Chrome doesn't guarantee that an open ``EventSource`` (or two: history + notifications) is torn down before the navigation issues requests for the new page — in selenium we've seen the new page load with the stale anonymous cookie because the server still had the old stream's trans in flight when ``GET /`` landed. Symptom: masthead stays on Login/Register and ``/api/users/current`` returns the anonymous quota even though ``POST /user/create`` returned 200. Adding a ``pagehide`` listener inside ``useSSE`` forces ``eventSource.close()`` synchronously right before navigation, closing the race. ``pagehide`` is preferred over ``beforeunload`` because it fires for back-forward-cache restores too and can't be cancelled by other handlers. Reproduced locally with ``GALAXY_CONFIG_OVERRIDE_ENABLE_NOTIFICATION_SYSTEM=1 GALAXY_CONFIG_OVERRIDE_ENABLE_SSE_HISTORY_UPDATES=1 ./run_tests.sh -selenium lib/galaxy_test/selenium/test_history_sharing.py::TestHistorySharing::test_unsharing`` — 3/3 failures before this change, 3/3 passes after.
4e8dd3c to
b4bf73a
Compare
Galaxy's Playwright driver teardown was ``browser.close()`` then ``playwright.stop()``. If ``browser.close()`` raised — in CI we've seen it hit target-detached errors or stall long enough to time out — the exception escaped before ``stop()`` ran, so the per-instance asyncio loop stayed flagged as "running" on the main test thread. Every subsequent test's ``sync_playwright().__enter__`` then rejected with "Playwright Sync API inside the asyncio loop" and the entire shard cascaded into 80+ ERRORs at setup. Wrap ``close()`` in ``try/finally`` and swallow + log any ``stop()`` exception so one test's bad teardown can't poison the next test's driver init.
…'t clobbered When handle_user_login invalidates the previous anonymous session and a concurrent request using the old cookie is still in flight, the server creates a *new* anonymous session for it and responds with a fresh `Set-Cookie: galaxysession=<anon>`. If that response lands between the login POST and the full-page navigation, the browser navigates with the anonymous cookie and the new page loads logged out. Under the TEMP SSE flag this happens often enough to trip `wait_for_logged_in` in selenium. Fix: synchronously close all long-lived connections (SSE, polling watchers) and rotate a shared axios AbortController before sending the login/register POST. With no in-flight anonymous-cookie request, the server can't emit the clobbering Set-Cookie, and the authenticated cookie survives until navigation.
pytest does not call tearDown when setUp raises, so if setup_selenium fails after setup_driver_and_session (e.g. wait_for_logged_in times out during register), the Playwright instance is leaked. Its per-instance asyncio loop stays registered as "running" on the main thread and every subsequent test in the shard errors with "Sync API inside the asyncio loop", cascading the whole shard. Wrap the post-allocation work in try/except that invokes tear_down_driver before re-raising. Backend-agnostic since tear_down_driver branches on backend_type. Complements the try/finally in HasPlaywrightDriver.quit (57f1aba) which only helps when quit runs at all.
The previous login-race fix wired a shared AbortController through an axios interceptor, but most first-party API traffic now goes through GalaxyApi (openapi-fetch + native fetch), which ignored it. In-flight /api/... calls could therefore still return a clobbering Set-Cookie: galaxysession=<anon> after login. Add an openapi-fetch middleware that attaches the same shared signal to every request (combined via AbortSignal.any with any caller-set signal) and honours the SKIP_PENDING_REQUESTS_HEADER opt-out. Register it before the rate-limiter so aborted requests bypass the queue. Rename cancelPendingAxiosRequests -> cancelPendingRequests since one rotation now covers both transports.
Scratchbook windows (WinBox iframes) load the same analysis route (``/datasets/X/display``) as the main page, so each one boots a full Galaxy Vue app. ``historyStore.startWatchingHistory()`` was called unconditionally in ``App.vue::setup()``, meaning every iframe opened its own EventSource to ``/api/events/stream``. With 3 long-lived SSE streams from the main page plus one per iframe, two open dataset windows is enough to saturate the HTTP/1.1 6-connections-per-origin budget and hang the tab — ``test_scratchbook_window_persistence`` hung indefinitely on every CI run of Playwright shard 1. Treat any frame where ``window.top !== window.self`` as embedded, in addition to the existing ``?embed=true`` route-query check. That suppresses history SSE in iframes (and also the existing ``startWatchingEntryPoints`` / ``startWatchingNotifications`` calls, which were already gated on ``!embedded``), leaving the per-origin connection budget untouched at 3 used / 3 free so iframes can still make regular API calls without queueing behind SSE sockets. Verified locally: test passes in 43s with SSE flags on; previously hung past the 5-minute timeout.
The three SSE-driven stores (history, notifications, entry points) each called useSSE() in their own setup(), producing three separate EventSource connections to /api/events/stream. HTTP/1.1 caps per-origin connections at six, so SSE alone consumed half the budget — and under iframe-heavy flows (scratchbook) it starved the pool and hung the tab. useSSE now multiplexes every subscriber through a single module-scoped EventSource with a per-event-type dispatch registry. Refcounted open/close so the socket is only created on first subscribe and closed when the last subscriber goes away. Existing store call sites are unchanged. While here: drop the dead /api/notifications/stream route. The generic /api/events/stream endpoint (lib/galaxy/webapps/galaxy/api/events.py) already serves notifications alongside history_update and entry_point_update events via the same SSEConnectionManager, and no frontend code referenced the notification-only path. Delete the route, its service method, and the auto-generated schema entries.
|
Selenium and playwright green (in SSE mode), and it has in fact caught a whole bunch of issues, including what I think is a race condition that can also hit on 26.0 when you log in or out while a request is still in flight (4c095fc, d6250e2). I'll pull that out for 26.0. I'm still not 100% convinced the kombu layer is needed and performant, but it's nice to see that this works in principle. And it's so cheap these days to explore alternative architectures. Benchmarking is next, I guess ? |
|
Taking another look at the architecture before signing off on this — still have some doubts about the Kombu IPC layer (cleanup/TTL on the SQLAlchemy transport, per-process Review notes so far: https://gist.github.com/mvdbeek/17cc86d6a3d75b86b6d23259ea87e985 Will follow up with concrete proposals. |
- test_notification_sse: connect to the unified /api/events/stream rather
than the deleted /api/notifications/stream. The old URL was matching
FastAPI's /api/notifications/{notification_id} route and failing the
encoded-id length check with HTTP 400.
- test_history_sse: add SSELineListener.wait_for_event_where(predicate)
and use it in both history-update tests. wait_for_event returned after
the first history_update of any kind, so a background update for an
unrelated history could land before the test's own upload dispatched,
causing the assertion to fail even though the event was in flight.
Every webapp process was running its own HistoryAuditMonitor (and the per-process Postgres LISTEN that comes with it). Move to a single elected producer via a new is_history_audit_monitor role on DatabaseHeartbeat. The election prefers any WorkerProcess with app_type=sse_monitor, falling back to the max-server_name webapp when no dedicated daemon is running, so the monitor migrates cleanly when the leader dies. Add a standalone galaxy-sse-monitor process (new lib/galaxy/sse_monitor package, galaxy-sse-monitor console script) that builds a minimal GalaxyManagerApplication, registers itself as sse_monitor.<host>.<pid>, and blocks on SIGTERM. Running this daemon moves SSE event production out of the webapp so its dispatch can't stall behind a webapp's GIL. Gate HistoryAuditMonitor.start/stop on the role callback rather than a postfork hook; make start() restart-safe (clear _exit) and shutdown() idempotent so role transitions can cycle the monitor.
The SA transport flags consumed messages visible=0 but never deletes them, so the default on-disk control.sqlite and its kombu_message table grow without bound. Add a prune_kombu_sqla_transport Celery task and a kombu_sqla_transport_cleanup_interval config option (default 900s) to periodically DELETE visible=0 rows. The task no-ops on non-SQLA broker schemes (RabbitMQ/Redis already honor per-message expiration).
Architecture/DI reviews flagged lib/galaxy/webapps/galaxy/metrics as a misplaced module — it has no webapp concern, is called from a Celery beat task, samples DB rows and broker state, and reached into StructuredApp as a service locator (app.amqp_internal_connection_obj, app.application_stack, app.model, app.execution_timer_factory, app[SSEConnectionManager]). Move queue_metrics.py under lib/galaxy/managers where its role — a periodic sampler mirroring the new sse.py/sse_dispatch.py — fits cleanly. Drop the webapps/galaxy/metrics directory entirely. Change emit_control_queue_depth, emit_worker_process_gauge and emit_queue_metrics to accept the narrow collaborators they actually need (Connection, ApplicationStack, GalaxyModelMapping, VanillaGalaxyStatsdClient, SSEConnectionManager). The Celery task in celery/tasks.py becomes the composition root that resolves those narrow deps from the app and passes them in. Type the send_control_task / send_local_control_task return values, type _run(fn: Callable[[], None]), and replace StructuredApp's execution_timer_factory: Any annotation with a TYPE_CHECKING-guarded ExecutionTimerFactory reference so the interface is strongly typed. Tests migrate to the new signatures and drop the _ContainerApp fake.
Dispatch tests had to monkeypatch module-level globals (ControlTask and all_control_queues_for_declare in galaxy.managers.sse_dispatch) because the dispatcher constructed its ControlTask and called the queues helper directly — the tests were effectively testing their own patch setup. Make both seams injectable on SSEEventDispatcher: - control_task_factory: Callable[[GalaxyQueueWorker], ControlTask], default ControlTask. - queues_provider: Callable[[], list[Queue]], default closes over application_stack and delegates to all_control_queues_for_declare (webapp_only=True) so production behavior is unchanged. test_sse_dispatch.py and test_sse_dispatch_cache.py drop every monkeypatch.setattr and pass FakeControlTask / a counting provider via the constructor. 9/9 unit tests still pass and the module-global knowledge leak from tests into production code is gone.
- Move the HistoryAuditMonitor import to the top of lib/galaxy/app — it's a pure Python class with no import-time side effects, so the conditional import added nothing. Drop the self._history_audit_monitor attribute and resolve the monitor via self[HistoryAuditMonitor] in the shutdown path so it stays off the app surface. - Inject NotificationManager into ShareableService directly instead of reaching self.notification_service.notification_manager from inside sharable.py. Thread the new param through HistoriesService, PagesService, VisualizationsService and WorkflowsService (all Lagom-resolved, so there are no manual call sites to update). - Add missing return annotations: EventsService.__init__ -> None, SSELineListener.__init__ -> None. Declare _PgListenAdapter._conn / .driver once at the class level so the psycopg / psycopg2 branches can assign without re-annotating the same attribute. - Tighten the historyStore idempotency test: assert expect(deltaAfterSecond).toBe(1) instead of toBeLessThanOrEqual(1) so a no-op second startWatchingHistory fails the test rather than trivially passing. Add a comment to the SSE-event test noting that the mocked refreshHistoryFromPush asserts dispatch only; the actual refresh is covered by the Selenium SSE integration tests.
Two failures introduced by the previous polish commits caught by CI: - ruff UP035 on lib/galaxy_test/base/sse.py: Callable must be imported from collections.abc, not typing (newer ruff version than the local venv had). - packages mypy on test_sse_dispatch_cache.py:78: the injected control_task_factory's type was Callable[[GalaxyQueueWorker], ControlTask], which rejected NoopControlTask (and would have also rejected FakeControlTask / BoomControlTask) because none of the test classes subclass ControlTask. Introduce a ControlTaskLike Protocol covering the single method the dispatcher calls (``send_task(**kwargs)``) and type the factory against it. Keeps ControlTask itself as the production default and lets the test fakes pass via structural typing.
Previous Protocol-based typing went the wrong direction: a Protocol with send_task(**kwargs) was too loose for ControlTask.send_task (which has specific positional params) AND too tight to uniformly cover the test fakes (NoopControlTask only accepts **kwargs, FakeControlTask takes named positional params). Mypy rejected ControlTask itself as the default. Drop the Protocol; type the factory return as Any. The runtime collaborator is duck-typed — the dispatcher only invokes send_task(**named_kwargs) — and Any is what the DI seam is actually for.
isort in the format lane rejected my placement of ``from galaxy.managers.notification import NotificationManager`` — it belongs alphabetically among the other galaxy.managers.* imports, not above the service-layer imports.
``black --check`` flagged two minor reformats in the new code: the slice whitespace style (sqlalchemy+ prefix strip) and a log.warning call that fits on one line.
…'t clobbered When handle_user_login invalidates the previous anonymous session and a concurrent request using the old cookie is still in flight, the server creates a *new* anonymous session for it and responds with a fresh `Set-Cookie: galaxysession=<anon>`. If that response lands between the login POST and the full-page navigation, the browser navigates with the anonymous cookie and the new page loads logged out. Fix: synchronously stop the polling watchers and rotate a shared AbortController before sending the login/register POST. The shared signal is wired through both axios (via a request interceptor) and the GalaxyApi/openapi-fetch client (via a request middleware) so a single rotation cancels every in-flight request, regardless of transport. With no in-flight anonymous-cookie request, the server can't emit the clobbering Set-Cookie, and the authenticated cookie survives until navigation. Backport of the relevant pieces of #22513 (sse-notifications) for release_26.0 — the same race exists outside SSE because it is the in-flight polling/REST traffic, not the SSE stream itself, that carries the stale cookie.
Server-Sent Events for history + notification updates
Summary
Replace Galaxy's 3-second history polling and 30-second notification polling with a push-based Server-Sent Events
(SSE) pipeline driven by PostgreSQL
LISTEN/NOTIFYand Kombu control tasks. The new path delivers events to thebrowser within milliseconds of the DB change while keeping polling available as a per-feature fallback for
deployments that don't enable it.
Motivation
Load: the 3s history poll hammers
/api/histories/{id}even for idle users; hundreds of concurrent tabsmultiply that into a real backend cost.
Latency: dataset state changes and incoming notifications can take the full polling interval to surface in
the UI.
Signal quality: polling is a poor proxy for "something changed" — it drives DB load whether or not anything
happened.
Architecture
History-update data flow (DB-driven)
Triggered by any write to
history/history_dataset_association/history_dataset_collection_association, nomatter the source (web, Celery, pulsar, direct SQL). One
HistoryAuditMonitorper deployment owns theLISTENsocket; events fan out across web workers via the control exchange.
sequenceDiagram autonumber participant DB as PostgreSQL participant Trg as audit trigger<br/>(pg_notify) participant Mon as HistoryAuditMonitor<br/>(LISTEN thread, 1 process) participant Dsp as SSEEventDispatcher participant MQ as Kombu control<br/>exchange participant W1 as Web worker 1<br/>QueueWorker + SSEConnectionManager participant W2 as Web worker 2<br/>QueueWorker + SSEConnectionManager participant Br as Browser<br/>EventSource + historyStore DB->>Trg: UPDATE / INSERT on history / HDA / HDCA Trg->>Mon: NOTIFY galaxy_history_update, history_id Mon->>Mon: batch window + debounce<br/>resolve owning user_ids Mon->>Dsp: history_update(user_updates) Dsp->>MQ: publish control task<br/>declare_queues = webapp_only par Fan-out to every web worker MQ-->>W1: history_update(user_updates, event_id) MQ-->>W2: history_update(user_updates, event_id) end W1->>W1: encode ids → SSEEvent<br/>push_to_user → call_soon_threadsafe<br/>→ asyncio.Queue W2->>W2: push_to_user (no-op if user not on this worker) W1-->>Br: SSE frame on /api/events/stream Br->>Br: useSSE → historyStore.handleHistorySSEEvent Br-->>Br: if currentHistoryId matches:<br/>refreshHistoryFromPush()Notification data flow (API-driven)
Triggered by an admin POST (or any server-side caller of
NotificationManager.send_notification_to_recipients).Broadcast notifications follow the same path but hit
notify_broadcast→push_broadcastinstead of per-userdelivery. Catch-up on reconnect is handled by
NotificationService.build_status_catchupusing the browser'sLast-Event-IDheader.sequenceDiagram autonumber participant Admin as Admin client participant Svc as NotificationService participant Mgr as NotificationManager participant Dsp as SSEEventDispatcher participant MQ as Kombu control exchange participant W1 as Web worker 1<br/>QueueWorker + SSEConnectionManager participant W2 as Web worker 2<br/>QueueWorker + SSEConnectionManager participant Br as Browser<br/>EventSource + notificationsStore Admin->>Svc: POST /api/notifications Svc->>Mgr: send_notification_to_recipients Mgr->>Mgr: persist and resolve user_ids Mgr->>Dsp: notify_users with user_ids, payload, event_id Dsp->>MQ: publish control task<br/>declare_queues = webapp_only par Fan-out to every web worker MQ-->>W1: notify_users task MQ-->>W2: notify_users task end W1->>W1: build SSEEvent notification_update<br/>push_to_user for each uid W2->>W2: push_to_user, no-op if user not here W1-->>Br: SSE frame on /api/notifications/stream<br/>or /api/events/stream Br->>Br: useSSE then notificationsStore.handleSSEEvent Br-->>Br: merge notification into store state<br/>bump totalUnreadCount Note over Br,Svc: On reconnect or tab re-open<br/>EventSource sends Last-Event-ID header Br->>Svc: GET /api/notifications/stream<br/>header Last-Event-ID Svc->>Mgr: build_status_catchup since parsed id Svc-->>Br: SSEEvent notification_status<br/>missed notifications and unread countDesign decisions shared by both flows
connection will find the user id in its
SSEConnectionManager.all_control_queues_for_declare(webapp_only=True)keeps job handlers and workflow schedulers out of the fan-out.
enable_sse_history_updates/enable_notification_systemare read once at store init; nowatch(connected, ...)flap.event_idviagalaxy.model.orm.now()— matches Galaxy's DB timestamp convention andround-trips cleanly through
parse_event_idforLast-Event-IDcatch-up.Changes
Backend
lib/galaxy/managers/sse.py—SSEConnectionManager(per-worker user→queue map, thread-safe push),SSEEvent,make_event_id/parse_event_id,IsDisconnectedcallable type.lib/galaxy/managers/sse_dispatch.py—SSEEventDispatcher, narrow DI (queue_worker,application_stack),publishes via
ControlTask(qw).send_task(...).lib/galaxy/managers/history_audit_monitor.py— background thread withpsycopg/psycopg2driver abstraction;polling fallback for SQLite.
lib/galaxy/model/triggers/update_audit_table.py— triggers emitpg_notify('galaxy_history_update', history_id). Sharedbuild_trigger_fnused by both runtime installer and the pg_notify migration.lib/galaxy/model/migrations/alembic/versions_gxy/b8d5e2f9a1c7_...py— backfills pg_notify-enabled triggerfunctions for installs that went through
c716ee82337b. Usesversion > 10to match the installed trigger shape.lib/galaxy/queue_worker.py—bind_publisher()split so Celery workers can publish without consuming; SSEcontrol-task handlers
notify_users/notify_broadcast/history_update.lib/galaxy/queues.py—all_control_queues_for_declare(..., webapp_only=True)queriesWorkerProcessdirectly.
lib/galaxy/webapps/galaxy/api/events.py+services/events.py— new unifiedGET /api/events/streamendpoint(thin controller, service layer).
lib/galaxy/webapps/galaxy/api/notifications.py+services/notifications.py— newGET /api/notifications/stream+open_streamservice method.Frontend
client/src/composables/useNotificationSSE.ts—useSSE(onEvent, eventTypes)composable; exposeswindow.__galaxy_sse_connectedand__galaxy_sse_last_event_tsfor selenium.client/src/stores/historyStore.ts— SSE branch drivesrefreshHistoryFromPush; polling branch instantiatedlazily so
visibilitychangedoesn't re-arm polling in SSE mode.client/src/stores/notificationsStore.ts— same config-driven SSE-or-polling pattern; handlers ingestnotification_update/broadcast_update/notification_status.client/vite-plugin-galaxy-dev-server.js— skip response buffering fortext/event-streamso dev-serverdoesn't sit on SSE frames forever.
Configuration
enable_notification_system/api/notifications/stream+ SSE for notificationsenable_sse_history_updatesHistoryAuditMonitor+ SSE for history changeshistory_audit_monitor_poll_intervalBoth flags are serialized to the client via
ConfigSerializer; stores read them once and pick SSE or pollingaccordingly.
Testing
test/integration/test_notification_sse.py,test_history_sse.py): real SSE streamsparsed off the wire via
lib/galaxy_test/base/sse.py; covers delivery, cross-user isolation, andLast-Event-IDcatch-up including the negative case (pre-id subjects must NOT be replayed).
test/integration_selenium/test_notification_sse.py): assertswindow.__galaxy_sse_last_event_tsadvances past a baseline before the UI assertion — catches silent regressions where the update arrives via
polling.
client/src/stores/*.test.ts): 11 tests covering SSE-vs-polling selection, tab-visibility invariant(polling must NOT re-arm in SSE mode), and
MessageEventhandlers mutating store state.Rollout / compatibility
/api/notifications,/api/notifications/status,history/current_history_json) are untouched.CREATE OR REPLACE FUNCTION); downgrade restores thenon-notify function bodies.
GalaxyQueueWorkerso notifications created in Celery tasks still fan outto web workers.
How to test the changes?
(Select all options that apply)
License