[service-protocol] Add support for suspension message V7#4601
[service-protocol] Add support for suspension message V7#4601muhamadazmy wants to merge 2 commits intorestatedev:mainfrom
Conversation
0a01200 to
66d91b0
Compare
6dba403 to
8e71340
Compare
slinkydeveloper
left a comment
There was a problem hiding this comment.
followup here is to enable via env the new protocol version negotiation in types service_protocol.rs, like was done back in this PR 2ce3607#diff-6442aa1f30aedc9c7fc4b6536f57b25d548f18508fc1e27195fe53eb001c8927R48
This way we can test the handling of the message already.
1249780 to
acb54e7
Compare
3307bc9 to
477efa7
Compare
| } | ||
| InvocationTaskOutputInner::SuspendedV2(notification_ids) => { | ||
| self.handle_invocation_task_suspended_v2(partition, invocation_id, notification_ids).await | ||
| InvocationTaskOutputInner::SuspendedV2(combinator) => { |
There was a problem hiding this comment.
here and everywhere you use the word combinator for future, replace it with future :)
| partition: PartitionLeaderEpoch, | ||
| invocation_id: InvocationId, | ||
| waiting_for_notifications: HashSet<NotificationId>, | ||
| combinator: UnresolvedFuture, |
| // service protocol messages. Otherwise, crates depending only on this feature fail clippy. | ||
| #[allow(dead_code)] | ||
| mod proto { | ||
| pub mod proto { |
| // If set, ignore the awaiting_for_* notifications above (17, 23, and 24) | ||
| // the awaiting_for_* are written as a flattened version of awaiting_on | ||
| // for Restate <= v1.7 | ||
| optional dev.restate.service.protocol.Future awaiting_on = 32; |
There was a problem hiding this comment.
optional is not needed here, it's ignored for non-primitive fields.
e41bdbf to
418ba05
Compare
slinkydeveloper
left a comment
There was a problem hiding this comment.
TODOs as followup to complete this:
- we need to make sure both sys_invocation_status and sys_invocation_state propagate the new unresolved future info.
| Suspended { | ||
| metadata: InFlightInvocationMetadata, | ||
| waiting_for_notifications: HashSet<NotificationId>, | ||
| awaiting_on: UnresolvedFuture, |
ab5f4ae to
807d98b
Compare
slinkydeveloper
left a comment
There was a problem hiding this comment.
another thing that we're missing is normalization of the future before proposing it in the invoker effects. we can assume the sdk behaves fine, but we must solve this before release to be defensive against sdk bugs.
0acf62d to
2e9934f
Compare
…e/decode Summary: This PR allows the messages (MessageType) to know the current service-protocol-version and then decide how to decode/encode to work the support version for the deployment. Note: For opaque messages, these are written as is to bifrost, and hence can't be verified against the select protocol-version.
Summary: This adds support to handle which suspension message to decode based on the protocol version. It also introduce the Suspension message V7 based on the new protocol.proto schema Update the invocation Suspended state to store and handle the awaiting on field
| impl Default for UnresolvedFuture { | ||
| fn default() -> Self { | ||
| Self::Unknown(Vec::default()) | ||
| } | ||
| } |
There was a problem hiding this comment.
imo you should remove this too, as this is at all effects an invalid future.
| // Should be treated as FirstCompleted, | ||
| // Used with Suspension V2 to indicate that | ||
| // the sdk did not provide a combinator kind | ||
| #[default] |
There was a problem hiding this comment.
let's remove this default too, i think it doesn't make a lot of sense
| // Used with Suspension V2 to indicate that | ||
| // the sdk did not provide a combinator kind |
There was a problem hiding this comment.
yes, but this is also used by the sdk too sometimes.
There was a problem hiding this comment.
the Unknown has a subtle semantic difference with FirstCompleted:
FirstCompletedcompletes with success if the first completed child future was successful, or completes with failure if the first completed child future failed.- For
Unknown, if the first completed child future succeeds, we can't tell ifUnknownitself will be marked as success or failure. The SDK can flip the result from success to failure, or viceversa.
| self | ||
| } | ||
|
|
||
| pub fn build(self) -> UnresolvedFuture { |
There was a problem hiding this comment.
shall we fail if there's no childs here?
| } | ||
| } | ||
|
|
||
| pub struct UnresolvedFutureBuilder { |
There was a problem hiding this comment.
given you use this only when doing tryfrom/into for storage protobuf conversions, maybe just move it there to keep this file slimmer?
There was a problem hiding this comment.
in tests you made those factory methods, you can just use those 🤷
There was a problem hiding this comment.
The tryfrom/into implementation is in 2 different palce (one for the protocol and the other for the storage) so it's better to have it in a common place
| pub enum RawNotificationResultVariant { | ||
| /// `Unknown` indicates no result is stored, which is the case | ||
| /// for notifications created before Restate v1.7. | ||
| #[default] |
There was a problem hiding this comment.
Not a big fan of having these defaults, especially for these unknown situations, they're a recipe for misuse :D
There was a problem hiding this comment.
I think this one is useful with serde. I think we should keep it. this way older notification entries with no result is set to unknown
[service-protocol] Add support for suspension message V7
Summary:
This adds support to handle which suspension message to decode
based on the protocol version. It also introduce the Suspension message V7
based on the new protocol.proto schema
Update the invocation Suspended state to store and handle
the awaiting on field
Stack created with Sapling. Best reviewed with ReviewStack.