turbo-tasks-backend: prevent duplicate task restores with restoring bits#92389
turbo-tasks-backend: prevent duplicate task restores with restoring bits#92389
Conversation
turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs
Outdated
Show resolved
Hide resolved
Merging this PR will improve performance by 3.22%
Performance Changes
Comparing Footnotes
|
Stats from current PR✅ No significant changes detected📊 All Metrics📖 Metrics GlossaryDev Server Metrics:
Build Metrics:
Change Thresholds:
⚡ Dev Server
📦 Dev Server (Webpack) (Legacy)📦 Dev Server (Webpack)
⚡ Production Builds
📦 Production Builds (Webpack) (Legacy)📦 Production Builds (Webpack)
📦 Bundle SizesBundle Sizes⚡ TurbopackClient Main Bundles
Server Middleware
Build DetailsBuild Manifests
📦 WebpackClient Main Bundles
Polyfills
Pages
Server Edge SSR
Middleware
Build DetailsBuild Manifests
Build Cache
🔄 Shared (bundler-independent)Runtimes
📎 Tarball URL |
turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs
Outdated
Show resolved
Hide resolved
Failing test suitesCommit: 622464d | About building and testing Next.js
Expand output● Handle url imports › should render the /static page ● Handle url imports › should client-render the /static page ● Handle url imports › should render the /ssr page ● Handle url imports › should client-render the /ssr page ● Handle url imports › should render the /ssg page ● Handle url imports › should client-render the /ssg page ● Handle url imports › should render a static url image import ● Handle url imports › should allow url import in css ● Handle url imports › should respond on value api |
…nd wait-for-restore logic Add `meta_restoring`/`data_restoring` transient flag bits to `TaskFlags` so that only one thread performs the backing-storage I/O for a given task category. Threads that observe the restoring bit already set spin on a `Storage::restored` event until the restoring thread either sets the restored bit (success) or clears the restoring bit without setting restored (error / panic path). The batch path in `prepare_tasks_with_callback` is restructured into two phases: Phase 1 claims unrestored tasks and performs I/O; Phase 2 waits on tasks whose restore was claimed by another thread. Also adds a synchronous `EventListener::wait()` method (needed for the blocking wait inside backend operations which run outside of an async context). Co-Authored-By: Claude <noreply@anthropic.com>
Extract the repeated "apply restore result + clear restoring bit" pattern into an
`apply_restore_result` free function, and the repeated wait-or-panic call into a
`wait_for_restore_or_panic` method. This removes ~50 lines of duplication across
`task()`, `task_pair()`, and `prepare_tasks_with_callback()`.
Also:
- Track `any_waiting` during Phase 1a to avoid an O(n) scan at the early-exit check
- Remove `has_data_result`/`has_meta_result` temporaries in Phase 1c
- Replace `Arc::new(e)` in batch error distribution with `format!("{e:?}")`
- Remove redundant `EventListener` type annotation (Rust infers it)
- Add `From<SpecificTaskDataCategory> for TaskDataCategory` conversion (used by helper)
- Tighten `Storage::restored` to `pub(crate)` (no external users)
Co-Authored-By: Claude <noreply@anthropic.com>
…rors
In prepare_tasks_with_callback Phase 1c, a task that both self-restores one
category AND waits on another thread for the other category had its callback
called twice: once in Phase 1c and again in Phase 2. Fix this by suppressing
the Phase 1c callback when the task still has pending waits; Phase 2 will call
the callback after all categories are fully restored.
Also collapse the nested `if let Some(x) = ... { if let Some(e) = ... { } }`
patterns into `if let Some(x) = ... && let Some(e) = ...` as required by
clippy::collapsible_if (-D warnings).
Co-Authored-By: Claude <noreply@anthropic.com>
- Drop task lock before notify on success paths so woken threads don't immediately contend on the same DashMap shard (task, task_pair, Phase 1c) - Only notify when we actually performed I/O (do_data/do_meta), skip spurious notify when we only waited for another thread - Change wait_for_restoring_task/wait_for_restore_or_panic to take SpecificTaskDataCategory instead of TaskDataCategory, preventing silent misbehavior if All were passed - Fix duplicated error message: bail returns a short cause string, the panic in wait_for_restore_or_panic wraps it with full context - Add debug_assert in apply_restore_result to catch callers that invoke it for an already-restored category - Add comment on was_unrestored capture explaining ordering dependency - Add missing comment on meta batch Ok(None) arm Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Claude <noreply@anthropic.com>
b281fb6 to
63cf689
Compare
| if !is_restoring { | ||
| // The restoring bit was cleared without setting the restored bit. | ||
| // This means the restoring thread encountered an error. | ||
| bail!("restoring thread failed"); |
There was a problem hiding this comment.
| bail!("restoring thread failed"); | |
| bail!("restoring failed"); |
There was a problem hiding this comment.
🤖 Fixed: bail!("restoring failed")
| tasks_to_restore_for_data_indicies.push(i); | ||
| ready = false; | ||
|
|
||
| if category.includes_data() && !task.flags.is_restored(TaskDataCategory::Data) { |
There was a problem hiding this comment.
| if category.includes_data() && !task.flags.is_restored(TaskDataCategory::Data) { | |
| if category.includes_data() && !task.flags.data_restored() { |
There was a problem hiding this comment.
🤖 Fixed — now uses task.flags.data_restored() directly.
| tasks_to_restore_for_meta_indicies.push(i); | ||
| ready = false; | ||
|
|
||
| if category.includes_meta() && !task.flags.is_restored(TaskDataCategory::Meta) { |
There was a problem hiding this comment.
| if category.includes_meta() && !task.flags.is_restored(TaskDataCategory::Meta) { | |
| if category.includes_meta() && !task.flags.meta_restored() { |
There was a problem hiding this comment.
🤖 Fixed — now uses task.flags.meta_restored() directly.
| StorageWriteGuard<'e>, | ||
| ), | ||
| ) { | ||
| let mut data_count = 0; |
There was a problem hiding this comment.
Can we add a fast path here when should_check_backing_storage is false? We could just set "restored" on all tasks can call the callback in a quick loop.
There was a problem hiding this comment.
🤖 Done — added a fast path at the top of prepare_tasks_with_callback that short-circuits when !should_check_backing_storage(), marking all tasks as restored and calling callbacks directly without going through the I/O pipeline.
| self.backend.storage.restored.notify(usize::MAX); | ||
| panic!("Failed to restore data for task {task_id}: {e:?}"); |
There was a problem hiding this comment.
Do not early exit here. We want all restore results to be applied before emitting the panic. Otherwise the restoring bit won't be unset for the remaining one and other threads wait forever. So store the task_ids and errors in a Vec and panic and notify after the loop
| if let Some(result) = entry.data_restore_result.take() { | ||
| // Capture before apply_restore_result sets is_restored, so we know whether | ||
| // this restore was fresh (and we should update the task cache afterwards). | ||
| let was_unrestored = !task.flags.is_restored(TaskDataCategory::Data); |
There was a problem hiding this comment.
was_unrestored is always true, since we only restore when it wasn't restored before. It cannot become restored concurrently since we prevent concurrent restores now.
There was a problem hiding this comment.
🤖 Removed the was_unrestored check — task_type is now always populated when we have a data restore result, since we hold the restoring bit.
| self.backend.storage.restored.notify(usize::MAX); | ||
| panic!("Failed to restore meta for task {task_id}: {e:?}"); |
There was a problem hiding this comment.
🤖 Same fix applied — removed the redundant is_restored check in apply_restore_result (kept the debug_assert).
| drop(task); | ||
| self.task_lock_counter.release(); | ||
| prepared_task_callback(self, task_id, category, task); | ||
| self.backend.storage.restored.notify(usize::MAX); |
There was a problem hiding this comment.
Notify after the loop when all tasks have been restored.
There was a problem hiding this comment.
🤖 Done — Phase 1c now only restores (drops lock), then a single notify(usize::MAX) is issued after the full loop completes.
| // Only call the callback if no other category is still being restored by another | ||
| // thread. If so, Phase 2 calls the callback after all categories are fully restored. | ||
| if !entry.wait_data && !entry.wait_meta { | ||
| let task = self.backend.storage.access_mut(task_id); | ||
| prepared_task_callback(self, task_id, category, task); | ||
| } |
There was a problem hiding this comment.
Let's move the callback into a separate loop (new phase 2, make waiting phase 3). This will finish restoring earlier and other threads can be notified earlier and do less waiting
There was a problem hiding this comment.
🤖 Done — callbacks are now in a separate Phase 2 loop that runs after Phase 1c has finished all restoring and issued the notification. Phase 3 then handles waiting.
| for entry in &tasks { | ||
| if entry.wait_data { | ||
| self.wait_for_restore_or_panic(entry.task_id, SpecificTaskDataCategory::Data); | ||
| } | ||
| if entry.wait_meta { | ||
| self.wait_for_restore_or_panic(entry.task_id, SpecificTaskDataCategory::Meta); | ||
| } | ||
| if entry.wait_data || entry.wait_meta { | ||
| // Now that the task is restored, call the callback | ||
| self.task_lock_counter.acquire(); | ||
| let task = self.backend.storage.access_mut(entry.task_id); | ||
| self.task_lock_counter.release(); | ||
| prepared_task_callback(self, entry.task_id, entry.category, task); | ||
| } | ||
| } |
There was a problem hiding this comment.
Waiting for each tasks on it's own is a bit inefficient. Let's wait for all tasks together. This will only use a single listener on the event.
Also wrap in if any_waiting
There was a problem hiding this comment.
🤖 Done — Phase 3 now waits for all tasks together with a single shared listen()/wait() loop, and the whole phase is wrapped in if any_waiting.
- Fast path in prepare_tasks_with_callback when !should_check_backing_storage() - Use data_restored()/meta_restored() directly instead of is_restored(TaskDataCategory::*) - Remove was_unrestored guard (always true since we hold the restoring bit) - Remove redundant is_restored() check inside apply_restore_result (keep debug_assert) - Fix bail! message: "restoring failed" instead of "restoring thread failed" - Separate callbacks into Phase 2, after all restoring+notify is done in Phase 1c, so other threads are unblocked as early as possible - Single notify(usize::MAX) after all tasks restored, not per-task - Wait for all tasks together in Phase 3 with a single shared listener loop, wrapped in `if any_waiting` - Add task_type and self_restored fields to TaskRestoreEntry Co-Authored-By: Claude <noreply@anthropic.com>
What?
Adds
data_restoringandmeta_restoringtransient flag bits toTaskStorage, a sharedrestored: EventonStorage, and a synchronousEventListener::wait()primitive. These are used to coordinate concurrent restore attempts so the same task is never loaded from the backing store more than once simultaneously.Why?
When multiple threads race to access an unrestored task, each thread would independently call into the backing storage to load the task's data. This causes redundant I/O and can result in one thread's write being silently discarded when another thread also applies a restore for the same task. The fix ensures only one thread performs the actual I/O while others wait for it to finish.
How?
New flag bits (
storage_schema.rs):meta_restoringanddata_restoringare transient (not persisted) bits that track whether a restore is currently in progress for each category of a task.New
restoredevent (storage.rs):A single
EventonStoragethat is notified after every restore attempt (success or failure). Waiting threads subscribe to it, then re-check the flags under lock after waking.EventListener::wait()(event.rs):A synchronous blocking wrapper around
event_listener's.wait(), needed because restore logic runs in synchronous backend operation contexts (not async).Restore protocol (
operation/mod.rs):All three restore entry points (
task(),task_pair(),prepare_tasks_with_callback()) follow the same pattern:*_restoringbit is already set, mark the task as "wait"; otherwise, set the bit and claim the I/O.*_restored, clear*_restoring, thennotify()therestoredevent so waiters wake.*_restoringset) — register a listener onrestoredbefore re-checking flags (to avoid lost wakeups), then loop until*_restoredis set or*_restoringclears without*_restoredbeing set (error path).prepare_tasks_with_callback()batches this into two explicit phases: Phase 1 claims and performs all I/O (using the batch backing-store API when there are multiple tasks), then Phase 2 waits for any tasks that were being restored by another thread.The implementation extracts two shared helpers to avoid repetition across the three call sites:
apply_restore_result()— clears the restoring bit and merges storage on success, or returns the error for the caller to panic after notifying waiters.wait_for_restore_or_panic()— wrapswait_for_restoring_task()and panics with a clear message on failure.