Skip to content

Commit 33c89eb

Browse files
committed
[service-protocol] Add support for suspension message V7
Summary: This adds support to handle which suspension message to decode based on the protocol version. It also introduce the Suspension message V7 based on the new protocol.proto schema Update the invocation Suspended state to store and handle the awaiting on field
1 parent a3be103 commit 33c89eb

38 files changed

Lines changed: 1511 additions & 206 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/invoker-api/src/effects.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,24 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11+
use std::collections::HashSet;
12+
1113
use restate_memory::NonZeroByteCount;
1214
use restate_types::deployment::PinnedDeployment;
1315
use restate_types::errors::InvocationError;
1416
use restate_types::identifiers::InvocationId;
1517
use restate_types::journal::EntryIndex;
1618
use restate_types::journal::enriched::EnrichedRawEntry;
1719
use restate_types::journal_events::raw::RawEvent;
18-
use restate_types::journal_v2;
1920
use restate_types::journal_v2::CommandIndex;
2021
use restate_types::journal_v2::raw::RawEntry;
22+
use restate_types::journal_v2::{self, UnresolvedFuture};
2123
use restate_types::storage::{StoredRawEntry, StoredRawEntryHeader};
2224
use restate_types::time::MillisSinceEpoch;
23-
use std::collections::HashSet;
2425

2526
use crate::EffectKind::JournalEntryV2;
2627

27-
#[derive(Debug, Clone, PartialEq, Eq)]
28+
#[derive(Debug, Clone)]
2829
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
2930
pub struct Effect {
3031
pub invocation_id: InvocationId,
@@ -37,7 +38,7 @@ pub struct Effect {
3738
pub kind: EffectKind,
3839
}
3940

40-
#[derive(Debug, Clone, PartialEq, Eq)]
41+
#[derive(Debug, Clone)]
4142
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
4243
// todo: fix this and box the large variant (EffectKind is 320 bytes)
4344
#[allow(clippy::large_enum_variant)]
@@ -62,7 +63,15 @@ pub enum EffectKind {
6263
event: RawEvent,
6364
},
6465
SuspendedV2 {
66+
/// Flattened set of notification ids derived from `awaiting_on`.
67+
/// Present only for backward compatibility with Restate < v1.7;
68+
/// Restate >= v1.7 should use the `awaiting_on` combinator instead.
69+
/// todo(azmy): drop in version >= v1.8
6570
waiting_for_notifications: HashSet<journal_v2::NotificationId>,
71+
/// Combinator tree describing the notifications this invocation is waiting on.
72+
/// Introduced in Restate v1.7 (protocol version V7). `None` for older invocations.
73+
/// todo(azmy): make required in >= v1.8
74+
awaiting_on: Option<UnresolvedFuture>,
6675
},
6776
Paused {
6877
paused_event: RawEvent,

crates/invoker-impl/src/invocation_task/mod.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,8 @@ use restate_types::identifiers::{InvocationId, PartitionLeaderEpoch};
4646
use restate_types::invocation::InvocationTarget;
4747
use restate_types::journal::EntryIndex;
4848
use restate_types::journal::enriched::EnrichedRawEntry;
49-
use restate_types::journal_v2;
5049
use restate_types::journal_v2::raw::RawNotification;
51-
use restate_types::journal_v2::{CommandIndex, NotificationId};
50+
use restate_types::journal_v2::{self, CommandIndex, UnresolvedFuture};
5251
use restate_types::live::Live;
5352
use restate_types::schema::deployment::DeploymentResolver;
5453
use restate_types::schema::invocation_target::InvocationTargetResolver;
@@ -85,6 +84,10 @@ const SERVICE_PROTOCOL_VERSION_V5: HeaderValue =
8584
const SERVICE_PROTOCOL_VERSION_V6: HeaderValue =
8685
HeaderValue::from_static("application/vnd.restate.invocation.v6");
8786

87+
#[allow(clippy::declare_interior_mutable_const)]
88+
const SERVICE_PROTOCOL_VERSION_V7: HeaderValue =
89+
HeaderValue::from_static("application/vnd.restate.invocation.v7");
90+
8891
#[allow(clippy::declare_interior_mutable_const)]
8992
const X_RESTATE_SERVER: HeaderName = HeaderName::from_static("x-restate-server");
9093

@@ -179,7 +182,7 @@ pub(super) enum InvocationTaskOutputInner {
179182
},
180183
Closed,
181184
Suspended(HashSet<EntryIndex>),
182-
SuspendedV2(HashSet<NotificationId>),
185+
SuspendedV2(UnresolvedFuture),
183186
Failed(InvokerError, LocalMemoryPool),
184187
/// The invocation task yielded due to memory pressure.
185188
/// The budget was dropped, returning memory to the global pool.
@@ -265,7 +268,7 @@ enum TerminalLoopState<T> {
265268
Continue(T),
266269
Closed,
267270
Suspended(HashSet<EntryIndex>),
268-
SuspendedV2(HashSet<NotificationId>),
271+
SuspendedV2(UnresolvedFuture),
269272
Failed(InvokerError),
270273
/// Memory budget exhausted — the invocation should yield.
271274
ShouldYield(InvocationMemoryExhausted),
@@ -600,6 +603,7 @@ fn service_protocol_version_to_header_value(
600603
ServiceProtocolVersion::V4 => SERVICE_PROTOCOL_VERSION_V4,
601604
ServiceProtocolVersion::V5 => SERVICE_PROTOCOL_VERSION_V5,
602605
ServiceProtocolVersion::V6 => SERVICE_PROTOCOL_VERSION_V6,
606+
ServiceProtocolVersion::V7 => SERVICE_PROTOCOL_VERSION_V7,
603607
}
604608
}
605609

crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11-
use std::collections::HashSet;
1211
use std::num::NonZeroUsize;
1312
use std::ops::Deref;
1413
use std::pin::Pin;
@@ -38,7 +37,7 @@ use restate_service_protocol_v4::entry_codec::ServiceProtocolV4Codec;
3837
use restate_service_protocol_v4::message_codec::{
3938
Decoder, Encoder, Message, MessageHeader, MessageType, StateEntry, proto,
4039
};
41-
use restate_types::errors::InvocationError;
40+
use restate_types::errors::{GenericError, InvocationError};
4241
use restate_types::identifiers::InvocationId;
4342
use restate_types::identifiers::ServiceId;
4443
use restate_types::invocation::{
@@ -51,7 +50,7 @@ use restate_types::journal_v2::command::{
5150
};
5251
use restate_types::journal_v2::raw::{RawCommand, RawEntry, RawNotification};
5352
use restate_types::journal_v2::{
54-
CommandIndex, CommandType, Entry, EntryType, NotificationId, RunCompletion, RunResult, SignalId,
53+
CommandIndex, CommandType, Entry, EntryType, RunCompletion, RunResult, UnresolvedFuture,
5554
};
5655
use restate_types::schema::deployment::{Deployment, DeploymentType, ProtocolType};
5756
use restate_types::schema::invocation_target::{DeploymentStatus, InvocationTargetResolver};
@@ -849,6 +848,7 @@ where
849848
MessageType::CommandAck,
850849
)),
851850
Message::Suspension(suspension) => self.handle_suspension_message(suspension),
851+
Message::AwaitingOn(awaiting_on) => self.handle_awaiting_on_message(awaiting_on),
852852
Message::Error(e) => self.handle_error_message(e),
853853
Message::End(_) => TerminalLoopState::Closed,
854854

@@ -1171,34 +1171,35 @@ where
11711171
}
11721172
}
11731173

1174+
fn handle_awaiting_on_message(
1175+
&mut self,
1176+
_awaiting_on: proto::AwaitingOnMessage,
1177+
) -> TerminalLoopState<()> {
1178+
// this message should mark this invocation as suspendable.
1179+
// if it's not running any side effects.
1180+
todo!("Handle awaiting on message");
1181+
}
1182+
11741183
fn handle_suspension_message(
11751184
&mut self,
11761185
suspension: proto::SuspensionMessage,
11771186
) -> TerminalLoopState<()> {
1178-
let suspension_indexes: HashSet<_> = suspension
1179-
.waiting_completions
1180-
.into_iter()
1181-
.map(NotificationId::for_completion)
1182-
.chain(
1183-
suspension
1184-
.waiting_signals
1185-
.into_iter()
1186-
.map(SignalId::for_index)
1187-
.map(NotificationId::for_signal),
1188-
)
1189-
.chain(
1190-
suspension
1191-
.waiting_named_signals
1192-
.into_iter()
1193-
.map(|s| SignalId::for_name(s.into()))
1194-
.map(NotificationId::for_signal),
1195-
)
1196-
.collect();
1197-
// We currently don't support empty suspension_indexes set
1198-
if suspension_indexes.is_empty() {
1187+
let Some(awaiting_on) = suspension.awaiting_on else {
1188+
return TerminalLoopState::Failed(InvokerError::EmptySuspensionMessage);
1189+
};
1190+
1191+
let combinator: UnresolvedFuture = match awaiting_on.try_into().map_err(GenericError::from)
1192+
{
1193+
Ok(combinator) => combinator,
1194+
Err(err) => return TerminalLoopState::Failed(InvokerError::EncodingV2(err.into())),
1195+
};
1196+
1197+
// We currently don't support empty combinator set
1198+
if combinator.is_empty() {
11991199
return TerminalLoopState::Failed(InvokerError::EmptySuspensionMessage);
12001200
}
1201-
TerminalLoopState::SuspendedV2(suspension_indexes)
1201+
1202+
TerminalLoopState::SuspendedV2(combinator)
12021203
}
12031204

12041205
fn handle_error_message(&mut self, error: proto::ErrorMessage) -> TerminalLoopState<()> {

crates/invoker-impl/src/lib.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ mod quota;
1818
mod state_machine_manager;
1919
mod status_store;
2020

21+
use std::cmp::Ordering;
2122
use std::collections::{HashMap, HashSet};
2223
use std::io::ErrorKind;
2324
use std::ops::RangeInclusive;
@@ -30,14 +31,16 @@ use futures::StreamExt;
3031
use gardal::futures::ThrottledStream;
3132
use gardal::{PaddedAtomicSharedStorage, StreamExt as GardalStreamExt, TokioClock};
3233
use metrics::counter;
33-
use restate_futures_util::concurrency::Permit;
3434
use tokio::sync::mpsc;
3535
use tokio::task::{AbortHandle, JoinSet};
36+
use tokio_util::time::DelayQueue;
37+
use tokio_util::time::delay_queue::Key as RetryTimerKey;
3638
use tracing::instrument;
3739
use tracing::{debug, trace, warn};
3840

3941
use restate_core::cancellation_token;
4042
use restate_errors::warn_it;
43+
use restate_futures_util::concurrency::Permit;
4144
use restate_invoker_api::capacity::TokenBucket;
4245
use restate_invoker_api::invocation_reader::InvocationReader;
4346
use restate_invoker_api::{
@@ -57,12 +60,11 @@ use restate_types::journal::enriched::EnrichedRawEntry;
5760
use restate_types::journal_events::raw::RawEvent;
5861
use restate_types::journal_events::{Event, PausedEvent, TransientErrorEvent};
5962
use restate_types::journal_v2::raw::{RawCommand, RawNotification};
60-
use restate_types::journal_v2::{CommandIndex, EntryMetadata, NotificationId};
63+
use restate_types::journal_v2::{CommandIndex, EntryMetadata, NotificationId, UnresolvedFuture};
6164
use restate_types::live::{Live, LiveLoad};
6265
use restate_types::schema::deployment::DeploymentResolver;
6366
use restate_types::schema::invocation_target::InvocationTargetResolver;
64-
use tokio_util::time::DelayQueue;
65-
use tokio_util::time::delay_queue::Key as RetryTimerKey;
67+
use restate_types::{RESTATE_VERSION_1_8_0, SemanticRestateVersion};
6668

6769
use crate::error::InvocationMemoryExhausted;
6870
use crate::error::InvokerError;
@@ -563,8 +565,8 @@ where
563565
requires_ack
564566
).await
565567
}
566-
InvocationTaskOutputInner::SuspendedV2(notification_ids) => {
567-
self.handle_invocation_task_suspended_v2(partition, invocation_id, notification_ids).await
568+
InvocationTaskOutputInner::SuspendedV2(combinator) => {
569+
self.handle_invocation_task_suspended_v2(partition, invocation_id, combinator).await
568570
}
569571
InvocationTaskOutputInner::ShouldYield { oom, budget } => {
570572
self.handle_invocation_task_should_yield(partition, invocation_id, oom, budget).await
@@ -1161,7 +1163,7 @@ where
11611163
&mut self,
11621164
partition: PartitionLeaderEpoch,
11631165
invocation_id: InvocationId,
1164-
waiting_for_notifications: HashSet<NotificationId>,
1166+
combinator: UnresolvedFuture,
11651167
) {
11661168
if let Some((sender, _, ism)) = self
11671169
.invocation_state_machine_manager
@@ -1198,7 +1200,18 @@ where
11981200
.send(Box::new(Effect {
11991201
invocation_id,
12001202
kind: EffectKind::SuspendedV2 {
1201-
waiting_for_notifications,
1203+
waiting_for_notifications: if SemanticRestateVersion::current()
1204+
.cmp_precedence(&RESTATE_VERSION_1_8_0)
1205+
== Ordering::Less
1206+
{
1207+
// for all versions before v1.8 we keep writing
1208+
// the flatten notification ids set.
1209+
combinator.flatten()
1210+
} else {
1211+
// we stop writing the waiting_for_notifications for versions >= v1.8
1212+
HashSet::default()
1213+
},
1214+
awaiting_on: Some(combinator),
12021215
},
12031216
}))
12041217
.await;

crates/metadata-server-grpc/src/grpc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub mod pb_conversions {
6868
fn try_from(kv_entry: KvEntry) -> Result<Self, Self::Error> {
6969
Ok((
7070
ByteString::try_from(kv_entry.key)
71-
.map_err(|_| ConversionError::invalid_data("key"))?,
71+
.map_err(|_| ConversionError::invalid_data_static("key"))?,
7272
kv_entry
7373
.value
7474
.ok_or(ConversionError::missing_field("value"))?

crates/metadata-server/src/grpc/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,15 @@ pub mod pb_conversions {
9595
.into();
9696

9797
let request = match WriteRequestKind::try_from(value.kind)
98-
.map_err(|_| ConversionError::invalid_data("kind"))?
98+
.map_err(|_| ConversionError::invalid_data_static("kind"))?
9999
{
100100
WriteRequestKind::Delete => Self {
101101
request_id,
102102
kind: crate::RequestKind::Delete {
103103
key: value
104104
.key
105105
.try_into()
106-
.map_err(|_| ConversionError::invalid_data("key"))?,
106+
.map_err(|_| ConversionError::invalid_data_static("key"))?,
107107
precondition: value
108108
.precondition
109109
.ok_or_else(|| ConversionError::missing_field("precondition"))?
@@ -116,7 +116,7 @@ pub mod pb_conversions {
116116
key: value
117117
.key
118118
.try_into()
119-
.map_err(|_| ConversionError::invalid_data("key"))?,
119+
.map_err(|_| ConversionError::invalid_data_static("key"))?,
120120
value: value
121121
.value
122122
.ok_or_else(|| ConversionError::missing_field("value"))?
@@ -127,7 +127,7 @@ pub mod pb_conversions {
127127
.try_into()?,
128128
},
129129
},
130-
_ => return Err(ConversionError::InvalidData("kind")),
130+
_ => return Err(ConversionError::invalid_data_static("kind")),
131131
};
132132

133133
Ok(request)

crates/partition-store/src/journal_table_v2/mod.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ use rocksdb::{DBAccess, DBRawIteratorWithThreadMode};
1818
use restate_memory::{LocalMemoryLease, LocalMemoryPool};
1919
use restate_rocksdb::{Priority, RocksDbPerfGuard};
2020
use restate_storage_api::journal_table_v2::{
21-
JournalEntryIndex, ReadJournalTable, ScanJournalTable, ScanJournalTableRange, StoredEntry,
22-
WriteJournalTable,
21+
JournalEntryIndex, NotificationEntryIndex, ReadJournalTable, ScanJournalTable,
22+
ScanJournalTableRange, StoredEntry, WriteJournalTable,
2323
};
2424
use restate_storage_api::protobuf_types::PartitionStoreProtobufValue;
2525
use restate_storage_api::{BudgetedReadError, Result, StorageError};
@@ -149,7 +149,10 @@ fn put_journal_entry<S: StorageAccess>(
149149
invocation_uuid: invocation_id.invocation_uuid(),
150150
notification_id: notification.id(),
151151
},
152-
&JournalEntryIndex(journal_index),
152+
&NotificationEntryIndex {
153+
entry_index: journal_index,
154+
result_variant: notification.result_variant(),
155+
},
153156
)?;
154157
} else if let RawEntry::Command(_) = &journal_entry.inner {
155158
for completion_id in related_completion_ids {
@@ -419,7 +422,7 @@ fn has_journal_entries(
419422
fn get_notifications_index<S: StorageAccess>(
420423
storage: &mut S,
421424
invocation_id: InvocationId,
422-
) -> Result<HashMap<NotificationId, EntryIndex>> {
425+
) -> Result<HashMap<NotificationId, NotificationEntryIndex>> {
423426
let key = JournalNotificationIdToNotificationIndexKey::builder()
424427
.partition_key(invocation_id.partition_key())
425428
.invocation_uuid(invocation_id.invocation_uuid());
@@ -431,12 +434,12 @@ fn get_notifications_index<S: StorageAccess>(
431434
.map(|(mut key, mut value)| {
432435
let journal_key =
433436
JournalNotificationIdToNotificationIndexKey::deserialize_from(&mut key)?;
434-
let index = JournalEntryIndex::decode(&mut value)
437+
let index = NotificationEntryIndex::decode(&mut value)
435438
.map_err(|err| StorageError::Conversion(err.into()))?;
436439

437440
let (_, _, notification_id) = journal_key.split();
438441

439-
Ok((notification_id, index.0))
442+
Ok((notification_id, index))
440443
})
441444
.collect()
442445
}
@@ -523,7 +526,7 @@ impl ReadJournalTable for PartitionStore {
523526
async fn get_notifications_index(
524527
&mut self,
525528
invocation_id: InvocationId,
526-
) -> Result<HashMap<NotificationId, EntryIndex>> {
529+
) -> Result<HashMap<NotificationId, NotificationEntryIndex>> {
527530
get_notifications_index(self, invocation_id)
528531
}
529532

@@ -655,7 +658,7 @@ impl ReadJournalTable for PartitionStoreTransaction<'_> {
655658
async fn get_notifications_index(
656659
&mut self,
657660
invocation_id: InvocationId,
658-
) -> Result<HashMap<NotificationId, EntryIndex>> {
661+
) -> Result<HashMap<NotificationId, NotificationEntryIndex>> {
659662
get_notifications_index(self, invocation_id)
660663
}
661664

crates/partition-store/src/tests/invocation_status_table_test/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ fn suspended_status(invocation_target: InvocationTarget) -> InvocationStatus {
120120
random_seed: None,
121121
},
122122
waiting_for_notifications: HashSet::default(),
123+
awaiting_on: None,
123124
}
124125
}
125126

0 commit comments

Comments
 (0)