Skip to content

Commit 612d672

Browse files
committed
[service-protocol] Add support for suspension message V7
Summary: This adds support to handle which suspension message to decode based on the protocol version. It also introduce the Suspension message V7 based on the new protocol.proto schema Update the invocation Suspended state to store and handle the awaiting on field
1 parent 552b115 commit 612d672

41 files changed

Lines changed: 2480 additions & 289 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/ci.yml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,23 @@ jobs:
228228
restateCommit: ${{ github.event.pull_request.head.sha || github.sha }}
229229
serviceImage: "ghcr.io/restatedev/test-services-node:main"
230230

231+
sdk-typescript-protocol-v7:
232+
name: Run SDK-Typescript integration tests with Protocol V7
233+
permissions:
234+
contents: read
235+
issues: read
236+
checks: write
237+
pull-requests: write
238+
actions: read
239+
secrets: inherit
240+
needs: docker
241+
uses: restatedev/sdk-typescript/.github/workflows/integration.yaml@main
242+
with:
243+
restateCommit: ${{ github.event.pull_request.head.sha || github.sha }}
244+
envVars: |
245+
RESTATE_experimental_allow_protocol_v7=true
246+
testArtifactOutput: sdk-typescript-protocol-v7
247+
231248
sdk-rust:
232249
name: Run SDK-Rust integration tests
233250
permissions:

crates/invoker-api/src/effects.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,24 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11+
use std::collections::HashSet;
12+
1113
use restate_memory::NonZeroByteCount;
1214
use restate_types::deployment::PinnedDeployment;
1315
use restate_types::errors::InvocationError;
1416
use restate_types::identifiers::InvocationId;
1517
use restate_types::journal::EntryIndex;
1618
use restate_types::journal::enriched::EnrichedRawEntry;
1719
use restate_types::journal_events::raw::RawEvent;
18-
use restate_types::journal_v2;
1920
use restate_types::journal_v2::CommandIndex;
2021
use restate_types::journal_v2::raw::RawEntry;
22+
use restate_types::journal_v2::{self, UnresolvedFuture};
2123
use restate_types::storage::{StoredRawEntry, StoredRawEntryHeader};
2224
use restate_types::time::MillisSinceEpoch;
23-
use std::collections::HashSet;
2425

2526
use crate::EffectKind::JournalEntryV2;
2627

27-
#[derive(Debug, Clone, PartialEq, Eq)]
28+
#[derive(Debug, Clone)]
2829
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
2930
pub struct Effect {
3031
pub invocation_id: InvocationId,
@@ -37,7 +38,7 @@ pub struct Effect {
3738
pub kind: EffectKind,
3839
}
3940

40-
#[derive(Debug, Clone, PartialEq, Eq)]
41+
#[derive(Debug, Clone)]
4142
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
4243
// todo: fix this and box the large variant (EffectKind is 320 bytes)
4344
#[allow(clippy::large_enum_variant)]
@@ -62,8 +63,16 @@ pub enum EffectKind {
6263
event: RawEvent,
6364
},
6465
SuspendedV2 {
66+
/// Flattened set of notification ids
67+
/// can be thought of as `UnresolvedFuture::unknown(waiting_for_notifications)`
6568
waiting_for_notifications: HashSet<journal_v2::NotificationId>,
6669
},
70+
// Introduced in Restate v1.7. With the new service-protocol v7
71+
SuspendedV3 {
72+
/// Future tree describing the notifications this invocation is waiting on.
73+
/// Introduced in Restate v1.7 (protocol version V7). `None` for older invocations.
74+
awaiting_on: UnresolvedFuture,
75+
},
6776
Paused {
6877
paused_event: RawEvent,
6978
},

crates/invoker-impl/src/invocation_task/mod.rs

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,8 @@ use restate_types::identifiers::{InvocationId, PartitionLeaderEpoch};
4646
use restate_types::invocation::InvocationTarget;
4747
use restate_types::journal::EntryIndex;
4848
use restate_types::journal::enriched::EnrichedRawEntry;
49-
use restate_types::journal_v2;
5049
use restate_types::journal_v2::raw::RawNotification;
51-
use restate_types::journal_v2::{CommandIndex, NotificationId};
50+
use restate_types::journal_v2::{self, CommandIndex, NotificationId, UnresolvedFuture};
5251
use restate_types::live::Live;
5352
use restate_types::schema::deployment::DeploymentResolver;
5453
use restate_types::schema::invocation_target::InvocationTargetResolver;
@@ -85,6 +84,10 @@ const SERVICE_PROTOCOL_VERSION_V5: HeaderValue =
8584
const SERVICE_PROTOCOL_VERSION_V6: HeaderValue =
8685
HeaderValue::from_static("application/vnd.restate.invocation.v6");
8786

87+
#[allow(clippy::declare_interior_mutable_const)]
88+
const SERVICE_PROTOCOL_VERSION_V7: HeaderValue =
89+
HeaderValue::from_static("application/vnd.restate.invocation.v7");
90+
8891
#[allow(clippy::declare_interior_mutable_const)]
8992
const X_RESTATE_SERVER: HeaderName = HeaderName::from_static("x-restate-server");
9093

@@ -180,6 +183,7 @@ pub(super) enum InvocationTaskOutputInner {
180183
Closed,
181184
Suspended(HashSet<EntryIndex>),
182185
SuspendedV2(HashSet<NotificationId>),
186+
SuspendedV3(UnresolvedFuture),
183187
Failed(InvokerError, LocalMemoryPool),
184188
/// The invocation task yielded due to memory pressure.
185189
/// The budget was dropped, returning memory to the global pool.
@@ -258,6 +262,8 @@ pub(super) struct InvocationTask<EE, DMR> {
258262

259263
// throttling
260264
action_token_bucket: Option<TokenBucket>,
265+
266+
allow_protocol_v7: bool,
261267
}
262268

263269
/// This is needed to split the run_internal in multiple loop functions and have shortcircuiting.
@@ -266,6 +272,7 @@ enum TerminalLoopState<T> {
266272
Closed,
267273
Suspended(HashSet<EntryIndex>),
268274
SuspendedV2(HashSet<NotificationId>),
275+
SuspendedV3(UnresolvedFuture),
269276
Failed(InvokerError),
270277
/// Memory budget exhausted — the invocation should yield.
271278
ShouldYield(InvocationMemoryExhausted),
@@ -305,6 +312,7 @@ macro_rules! shortcircuit {
305312
TerminalLoopState::Closed => return TerminalLoopState::Closed,
306313
TerminalLoopState::Suspended(v) => return TerminalLoopState::Suspended(v),
307314
TerminalLoopState::SuspendedV2(v) => return TerminalLoopState::SuspendedV2(v),
315+
TerminalLoopState::SuspendedV3(v) => return TerminalLoopState::SuspendedV3(v),
308316
TerminalLoopState::ShouldYield(oom) => return TerminalLoopState::ShouldYield(oom),
309317
TerminalLoopState::Failed(e) => return TerminalLoopState::Failed(e),
310318
}
@@ -333,6 +341,7 @@ where
333341
invoker_tx: mpsc::UnboundedSender<InvocationTaskOutput>,
334342
invoker_rx: mpsc::UnboundedReceiver<Notification>,
335343
action_token_bucket: Option<TokenBucket>,
344+
allow_protocol_v7: bool,
336345
) -> Self {
337346
Self {
338347
client,
@@ -350,6 +359,7 @@ where
350359
message_size_warning,
351360
retry_count_since_last_stored_entry,
352361
action_token_bucket,
362+
allow_protocol_v7,
353363
}
354364
}
355365

@@ -388,6 +398,7 @@ where
388398
TerminalLoopState::Closed => InvocationTaskOutputInner::Closed,
389399
TerminalLoopState::Suspended(v) => InvocationTaskOutputInner::Suspended(v),
390400
TerminalLoopState::SuspendedV2(v) => InvocationTaskOutputInner::SuspendedV2(v),
401+
TerminalLoopState::SuspendedV3(v) => InvocationTaskOutputInner::SuspendedV3(v),
391402
TerminalLoopState::Failed(e) => {
392403
// Best effort to release excessive memory. Note there can still be effects in flight
393404
// that are being replicated and thereby occupy memory. Best if we periodically check
@@ -478,13 +489,16 @@ where
478489
);
479490

480491
let chosen_service_protocol_version = shortcircuit!(
481-
ServiceProtocolVersion::pick(&deployment.supported_protocol_versions,)
482-
.ok_or_else(|| {
483-
InvokerError::IncompatibleServiceEndpoint(
484-
deployment.id,
485-
deployment.supported_protocol_versions.clone(),
486-
)
487-
})
492+
ServiceProtocolVersion::pick(
493+
&deployment.supported_protocol_versions,
494+
self.allow_protocol_v7
495+
)
496+
.ok_or_else(|| {
497+
InvokerError::IncompatibleServiceEndpoint(
498+
deployment.id,
499+
deployment.supported_protocol_versions.clone(),
500+
)
501+
})
488502
);
489503

490504
(
@@ -600,6 +614,7 @@ fn service_protocol_version_to_header_value(
600614
ServiceProtocolVersion::V4 => SERVICE_PROTOCOL_VERSION_V4,
601615
ServiceProtocolVersion::V5 => SERVICE_PROTOCOL_VERSION_V5,
602616
ServiceProtocolVersion::V6 => SERVICE_PROTOCOL_VERSION_V6,
617+
ServiceProtocolVersion::V7 => SERVICE_PROTOCOL_VERSION_V7,
603618
}
604619
}
605620

crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11-
use std::collections::HashSet;
1211
use std::num::NonZeroUsize;
1312
use std::ops::Deref;
1413
use std::pin::Pin;
@@ -38,7 +37,7 @@ use restate_service_protocol_v4::entry_codec::ServiceProtocolV4Codec;
3837
use restate_service_protocol_v4::message_codec::{
3938
Decoder, Encoder, Message, MessageHeader, MessageType, StateEntry, proto,
4039
};
41-
use restate_types::errors::InvocationError;
40+
use restate_types::errors::{GenericError, InvocationError};
4241
use restate_types::identifiers::InvocationId;
4342
use restate_types::identifiers::ServiceId;
4443
use restate_types::invocation::{
@@ -51,7 +50,7 @@ use restate_types::journal_v2::command::{
5150
};
5251
use restate_types::journal_v2::raw::{RawCommand, RawEntry, RawNotification};
5352
use restate_types::journal_v2::{
54-
CommandIndex, CommandType, Entry, EntryType, NotificationId, RunCompletion, RunResult, SignalId,
53+
CommandIndex, CommandType, Entry, EntryType, RunCompletion, RunResult, UnresolvedFuture,
5554
};
5655
use restate_types::schema::deployment::{Deployment, DeploymentType, ProtocolType};
5756
use restate_types::schema::invocation_target::{DeploymentStatus, InvocationTargetResolver};
@@ -849,6 +848,7 @@ where
849848
MessageType::CommandAck,
850849
)),
851850
Message::Suspension(suspension) => self.handle_suspension_message(suspension),
851+
Message::AwaitingOn(awaiting_on) => self.handle_awaiting_on_message(awaiting_on),
852852
Message::Error(e) => self.handle_error_message(e),
853853
Message::End(_) => TerminalLoopState::Closed,
854854

@@ -1171,34 +1171,45 @@ where
11711171
}
11721172
}
11731173

1174+
fn handle_awaiting_on_message(
1175+
&mut self,
1176+
_awaiting_on: proto::AwaitingOnMessage,
1177+
) -> TerminalLoopState<()> {
1178+
// this message should mark this invocation as suspendable.
1179+
// if it's not running any side effects.
1180+
1181+
// todo(azmy): Handle awaiting on message"
1182+
TerminalLoopState::Continue(())
1183+
}
1184+
11741185
fn handle_suspension_message(
11751186
&mut self,
11761187
suspension: proto::SuspensionMessage,
11771188
) -> TerminalLoopState<()> {
1178-
let suspension_indexes: HashSet<_> = suspension
1179-
.waiting_completions
1180-
.into_iter()
1181-
.map(NotificationId::for_completion)
1182-
.chain(
1183-
suspension
1184-
.waiting_signals
1185-
.into_iter()
1186-
.map(SignalId::for_index)
1187-
.map(NotificationId::for_signal),
1188-
)
1189-
.chain(
1190-
suspension
1191-
.waiting_named_signals
1192-
.into_iter()
1193-
.map(|s| SignalId::for_name(s.into()))
1194-
.map(NotificationId::for_signal),
1195-
)
1196-
.collect();
1197-
// We currently don't support empty suspension_indexes set
1198-
if suspension_indexes.is_empty() {
1189+
let Some(awaiting_on) = suspension.awaiting_on else {
11991190
return TerminalLoopState::Failed(InvokerError::EmptySuspensionMessage);
1191+
};
1192+
1193+
let future: UnresolvedFuture = match awaiting_on.try_into().map_err(GenericError::from) {
1194+
Ok(future) => future,
1195+
Err(err) => return TerminalLoopState::Failed(InvokerError::EncodingV2(err.into())),
1196+
};
1197+
1198+
// We currently don't support empty future set
1199+
if future.is_empty() {
1200+
return TerminalLoopState::Failed(InvokerError::EmptySuspensionMessage);
1201+
}
1202+
1203+
match self.service_protocol_version {
1204+
ServiceProtocolVersion::Unspecified => unreachable!(),
1205+
ServiceProtocolVersion::V1
1206+
| ServiceProtocolVersion::V2
1207+
| ServiceProtocolVersion::V3
1208+
| ServiceProtocolVersion::V4
1209+
| ServiceProtocolVersion::V5
1210+
| ServiceProtocolVersion::V6 => TerminalLoopState::SuspendedV2(future.flatten()),
1211+
ServiceProtocolVersion::V7 => TerminalLoopState::SuspendedV3(future),
12001212
}
1201-
TerminalLoopState::SuspendedV2(suspension_indexes)
12021213
}
12031214

12041215
fn handle_error_message(&mut self, error: proto::ErrorMessage) -> TerminalLoopState<()> {

0 commit comments

Comments
 (0)