Skip to content

Commit c060774

Browse files
[stable2603] Backport #11139 (#11166)
Backport #11139 into `stable2603` from alexggh. See the [documentation](https://github.com/paritytech/polkadot-sdk/blob/master/docs/BACKPORT.md) on how to use this bot. <!-- # To be used by other automation, do not modify: original-pr-number: #${pull_number} --> Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io> Co-authored-by: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com>
1 parent 5d00498 commit c060774

File tree

11 files changed

+410
-116
lines changed

11 files changed

+410
-116
lines changed

cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store.rs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
use std::time::Duration;
88

99
use sp_core::{Bytes, Encode};
10-
use sp_statement_store::{SubmitResult, Topic, TopicFilter};
10+
use sp_statement_store::{StatementEvent, SubmitResult, Topic, TopicFilter};
1111
use zombienet_sdk::subxt::ext::subxt_rpcs::rpc_params;
1212

1313
use crate::zombie_ci::statement_store_bench::{get_keypair, spawn_network};
@@ -39,7 +39,7 @@ async fn statement_store() -> Result<(), anyhow::Error> {
3939
// Subscribe to statements with topic "topic" to dave.
4040
let stop_after_secs = 20;
4141
let mut subscription = dave_rpc
42-
.subscribe::<Bytes>(
42+
.subscribe::<StatementEvent>(
4343
"statement_subscribeStatement",
4444
rpc_params![TopicFilter::MatchAll(vec![topic].try_into().expect("Single topic"))],
4545
"statement_unsubscribeStatement",
@@ -50,14 +50,27 @@ async fn statement_store() -> Result<(), anyhow::Error> {
5050
let _: SubmitResult =
5151
charlie_rpc.request("statement_submit", rpc_params![statement.clone()]).await?;
5252

53-
let statement_bytes =
54-
tokio::time::timeout(Duration::from_secs(stop_after_secs), subscription.next())
55-
.await
56-
.expect("Should not timeout")
57-
.expect("Should receive")
58-
.expect("Should not error");
53+
loop {
54+
let subscribe_item =
55+
tokio::time::timeout(Duration::from_secs(stop_after_secs), subscription.next())
56+
.await
57+
.expect("Should not timeout")
58+
.expect("Should receive")
59+
.expect("Should not error");
5960

60-
assert_eq!(statement_bytes, statement);
61+
let statement_bytes = match subscribe_item {
62+
StatementEvent::NewStatements { statements: mut batch, .. } => {
63+
if batch.is_empty() {
64+
continue;
65+
}
66+
assert_eq!(batch.len(), 1, "Expected exactly one statement in batch");
67+
batch.remove(0)
68+
},
69+
};
70+
assert_eq!(statement_bytes, statement);
71+
72+
break;
73+
}
6174
// Now make sure no more statements are received.
6275
assert!(tokio::time::timeout(Duration::from_secs(stop_after_secs), subscription.next())
6376
.await

cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs

Lines changed: 51 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ use log::{debug, info, trace};
1010
use sc_statement_store::{DEFAULT_MAX_TOTAL_SIZE, DEFAULT_MAX_TOTAL_STATEMENTS};
1111
use sp_core::{blake2_256, hexdisplay::HexDisplay, sr25519, Bytes, Pair};
1212
use sp_statement_store::{
13-
statement_allowance_key, Channel, Statement, StatementAllowance, SubmitResult, Topic,
14-
TopicFilter,
13+
statement_allowance_key, Channel, Statement, StatementAllowance, StatementEvent, SubmitResult,
14+
Topic, TopicFilter,
1515
};
1616
use std::{
1717
cell::Cell,
@@ -614,7 +614,7 @@ impl Participant {
614614
for idx in &pending {
615615
let subscription = self
616616
.rpc_client
617-
.subscribe::<Bytes>(
617+
.subscribe::<StatementEvent>(
618618
"statement_subscribeStatement",
619619
rpc_params![TopicFilter::MatchAll(
620620
vec![topic_public_key(), topic_idx(*idx)].try_into().expect("Two topics")
@@ -628,13 +628,28 @@ impl Participant {
628628
let mut futures: FuturesUnordered<_> = subscriptions
629629
.into_iter()
630630
.map(|(idx, mut subscription)| async move {
631-
let statement_bytes =
632-
timeout(Duration::from_secs(SUBSCRIBE_TIMEOUT_SECS), subscription.next())
633-
.await
634-
.map_err(|_| anyhow!("Timeout waiting for session key"))?
635-
.ok_or_else(|| anyhow!("Subscription ended unexpectedly"))?
636-
.map_err(|e| anyhow!("Subscription error: {}", e))?;
637-
let statement = Statement::decode(&mut &statement_bytes[..])
631+
let mut batch;
632+
loop {
633+
let item =
634+
timeout(Duration::from_secs(SUBSCRIBE_TIMEOUT_SECS), subscription.next())
635+
.await
636+
.map_err(|_| anyhow!("Timeout waiting for session key"))?
637+
.ok_or_else(|| anyhow!("Subscription ended unexpectedly"))?
638+
.map_err(|e| anyhow!("Subscription error: {}", e))?;
639+
let StatementEvent::NewStatements { statements, .. } = item;
640+
if statements.is_empty() {
641+
continue; // Ignore empty batches
642+
} else {
643+
batch = statements;
644+
break;
645+
}
646+
}
647+
648+
if batch.len() != 1 {
649+
return Err(anyhow!("Expected exactly one statement, got: {}", batch.len()));
650+
}
651+
652+
let statement = Statement::decode(&mut &batch.remove(0)[..])
638653
.map_err(|e| anyhow!("Failed to decode statement: {}", e))?;
639654
let data = statement.data().ok_or_else(|| anyhow!("Statement missing data"))?;
640655
let session_key = sr25519::Public::from_raw(
@@ -686,7 +701,7 @@ impl Participant {
686701
for &(sender_idx, sender_session_key) in &pending {
687702
let subscription = self
688703
.rpc_client
689-
.subscribe::<Bytes>(
704+
.subscribe::<StatementEvent>(
690705
"statement_subscribeStatement",
691706
rpc_params![TopicFilter::MatchAll(
692707
vec![topic_message(), topic_pair(&sender_session_key, &own_session_key)]
@@ -702,13 +717,27 @@ impl Participant {
702717
let mut futures: FuturesUnordered<_> = subscriptions
703718
.into_iter()
704719
.map(|(sender_idx, mut subscription)| async move {
705-
let statement_bytes =
706-
timeout(Duration::from_secs(SUBSCRIBE_TIMEOUT_SECS), subscription.next())
707-
.await
708-
.map_err(|_| anyhow!("Timeout waiting for message"))?
709-
.ok_or_else(|| anyhow!("Subscription ended unexpectedly"))?
710-
.map_err(|e| anyhow!("Subscription error: {}", e))?;
711-
let statement = Statement::decode(&mut &statement_bytes[..])
720+
let mut batch;
721+
loop {
722+
let item =
723+
timeout(Duration::from_secs(SUBSCRIBE_TIMEOUT_SECS), subscription.next())
724+
.await
725+
.map_err(|_| anyhow!("Timeout waiting for message"))?
726+
.ok_or_else(|| anyhow!("Subscription ended unexpectedly"))?
727+
.map_err(|e| anyhow!("Subscription error: {}", e))?;
728+
batch = match item {
729+
StatementEvent::NewStatements { statements: batch, .. } => batch,
730+
};
731+
if batch.is_empty() {
732+
continue; // Ignore empty batches
733+
} else {
734+
break;
735+
}
736+
}
737+
if batch.len() != 1 {
738+
return Err(anyhow!("Expected exactly one statement, got: {}", batch.len()));
739+
}
740+
let statement = Statement::decode(&mut &batch.remove(0)[..])
712741
.map_err(|e| anyhow!("Failed to decode statement: {}", e))?;
713742
let data = statement.data().ok_or_else(|| anyhow!("Statement missing data"))?;
714743
let req = StatementMessage::decode(&mut &data[..])
@@ -929,7 +958,7 @@ async fn statement_store_latency_bench() -> Result<(), anyhow::Error> {
929958
let topic: Topic = blake2_256(topic_str.as_bytes()).into();
930959

931960
let subscription = rpc_client
932-
.subscribe::<Bytes>(
961+
.subscribe::<StatementEvent>(
933962
"statement_subscribeStatement",
934963
rpc_params![TopicFilter::MatchAll(
935964
vec![topic].try_into().expect("Single topic")
@@ -1008,7 +1037,9 @@ async fn statement_store_latency_bench() -> Result<(), anyhow::Error> {
10081037
.into_iter()
10091038
.map(|(msg_idx, topic_str, mut subscription)| async move {
10101039
match timeout(total_timeout, subscription.next()).await {
1011-
Ok(Some(Ok(_statement_bytes))) => Ok((msg_idx, topic_str)),
1040+
Ok(Some(Ok(StatementEvent::NewStatements { .. }))) => {
1041+
Ok((msg_idx, topic_str))
1042+
},
10121043
Ok(Some(Err(e))) => Err(anyhow!(
10131044
"Subscription error for message {}: {}",
10141045
msg_idx,

prdoc/pr_11139.prdoc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
title: 'make subscription return statement event instead of bytes'
2+
doc:
3+
- audience: Node Dev
4+
description: |
5+
Changes the statement subscription RPC to return `StatementEvent` instead of raw `Bytes`.
6+
When a subscription is initiated, the endpoint now sends NewStatements event batches
7+
with matching statements already in the store. If there are not statements in the store
8+
an empty batch is sent.
9+
10+
crates:
11+
- name: sp-statement-store
12+
bump: minor
13+
- name: sc-statement-store
14+
bump: minor
15+
- name: sc-rpc
16+
bump: minor
17+
- name: sc-rpc-api
18+
bump: minor

substrate/client/rpc-api/src/statement/mod.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
2121
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
2222
use sp_core::Bytes;
23-
use sp_statement_store::{SubmitResult, TopicFilter};
23+
use sp_statement_store::{StatementEvent, SubmitResult, TopicFilter};
2424

2525
pub mod error;
2626

@@ -37,14 +37,22 @@ pub trait StatementApi {
3737
///
3838
/// # Returns
3939
///
40-
/// Returns a stream of SCALE-encoded statements as `Bytes`.
41-
/// When a subscription is initiated the endpoint will immediately return the matching
42-
/// statements already in the store. Subsequent matching statements will be pushed to the client
43-
/// as they are added to the store.
40+
/// Returns a stream of `StatementEvent` values.
41+
/// When a subscription is initiated the endpoint will first return all matching statements
42+
/// already in the store in batches as `StatementEvent::NewStatements`.
43+
///
44+
/// NewStatements includes an Optional field `remaining` which indicates how many more
45+
/// statements are left to be sent in the initial batch of existing statements. The field
46+
/// guarantees to the client that it will receive at least this many more statements in the
47+
/// subscription stream, but it may receive more if new statements are added to the store that
48+
/// match the filter.
49+
///
50+
/// If there are no statements in the store matching the filter, an empty batch of statements
51+
/// is sent.
4452
#[subscription(
4553
name = "statement_subscribeStatement" => "statement_statement",
4654
unsubscribe = "statement_unsubscribeStatement",
47-
item = Bytes,
55+
item = StatementEvent,
4856
with_extensions,
4957
)]
5058
fn subscribe_statement(&self, topic_filter: TopicFilter);

substrate/client/rpc/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ workspace = true
1616
targets = ["x86_64-unknown-linux-gnu"]
1717

1818
[dependencies]
19+
async-channel = { workspace = true }
1920
codec = { workspace = true, default-features = true }
2021
futures = { workspace = true }
2122
jsonrpsee = { features = ["server"], workspace = true }
@@ -45,7 +46,6 @@ tokio = { workspace = true, default-features = true }
4546

4647
[dev-dependencies]
4748
assert_matches = { workspace = true }
48-
async-channel = { workspace = true }
4949
futures-timer = { workspace = true }
5050
pretty_assertions = { workspace = true }
5151
sc-keystore = { workspace = true }

substrate/client/rpc/src/statement/mod.rs

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,17 @@ use jsonrpsee::{
2727
/// Re-export the API for backward compatibility.
2828
pub use sc_rpc_api::statement::{error::Error, StatementApiServer};
2929
use sp_core::Bytes;
30-
use sp_statement_store::{OptimizedTopicFilter, StatementSource, SubmitResult, TopicFilter};
30+
use sp_statement_store::{
31+
OptimizedTopicFilter, StatementEvent, StatementSource, SubmitResult, TopicFilter,
32+
};
3133
use std::sync::Arc;
3234
const LOG_TARGET: &str = "statement-store-rpc";
35+
// The maximum size of a chunk of statements to send in a single JSON response. This is needed to
36+
// avoid hitting the maximum JSON size limit in the RPC response. Each statement is SCALE-encoded
37+
// and then hex-encoded in the JSON response, so the size of the JSON response is approximately 2x.
38+
// This value is chosen to be large enough to send a reasonable number of statements in a single
39+
// chunk, but small enough to avoid hitting the JSON size limit.
40+
const MAX_CHUNK_BYTES_LIMIT: usize = 4 * 1024 * 1024;
3341

3442
use crate::{
3543
utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
@@ -39,6 +47,56 @@ use crate::{
3947
#[cfg(test)]
4048
mod tests;
4149

50+
/// Send existing statements in chunks over the subscription channel.
51+
///
52+
/// Splits the statements into chunks that fit within [`MAX_CHUNK_BYTES_LIMIT`] to avoid
53+
/// exceeding the RPC max response size, then sends each chunk as a
54+
/// [`StatementEvent::NewStatements`].
55+
async fn send_in_chunks(
56+
existing_statements: Vec<Vec<u8>>,
57+
subscription_sender: async_channel::Sender<StatementEvent>,
58+
) {
59+
let mut iter = existing_statements.into_iter().peekable();
60+
loop {
61+
let mut chunk = Vec::<Bytes>::new();
62+
let mut chunk_json_size = 0usize;
63+
while let Some(statement) = iter.peek() {
64+
// Each SCALE-encoded byte becomes 2 hex chars in the JSON response
65+
let json_size_estimate = statement.len() * 2;
66+
// If a single statement exceeds the max chunk size, skip it but continue sending the
67+
// rest of the statements. This would never happen in practice because the statement
68+
// store should reject statements that are too large, but we add this check to be safe.
69+
if json_size_estimate > MAX_CHUNK_BYTES_LIMIT {
70+
iter.next();
71+
continue;
72+
}
73+
if chunk_json_size + json_size_estimate > MAX_CHUNK_BYTES_LIMIT {
74+
break;
75+
}
76+
let Some(statement) = iter.next() else { break };
77+
chunk_json_size += json_size_estimate;
78+
chunk.push(statement.into());
79+
}
80+
if chunk.is_empty() {
81+
break;
82+
}
83+
let remaining = iter.len();
84+
if let Err(e) = subscription_sender
85+
.send(StatementEvent::NewStatements {
86+
statements: chunk,
87+
remaining: Some(remaining as u32),
88+
})
89+
.await
90+
{
91+
log::warn!(
92+
target: LOG_TARGET,
93+
"Failed to send existing statement in subscription: {:?}", e
94+
);
95+
break;
96+
}
97+
}
98+
}
99+
42100
/// Trait alias for statement store API required by the RPC.
43101
pub trait StatementStoreApi:
44102
sp_statement_store::StatementStore + sc_statement_store::StatementStoreSubscriptionApi
@@ -103,22 +161,10 @@ impl StatementApiServer for StatementStore {
103161
.pipe_from_stream(subscription_stream, BoundedVecDeque::new(128)),
104162
);
105163

106-
// Send existing statements before returning, to make sure we did not miss any statements.
107164
self.executor.spawn(
108165
"statement-store-rpc-send",
109166
Some("rpc"),
110-
async move {
111-
for statement in existing_statements {
112-
if let Err(e) = subscription_sender.send(statement.into()).await {
113-
log::warn!(
114-
target: LOG_TARGET,
115-
"Failed to send existing statement in subscription: {:?}", e
116-
);
117-
break;
118-
}
119-
}
120-
}
121-
.boxed(),
167+
send_in_chunks(existing_statements, subscription_sender).boxed(),
122168
)
123169
}
124170
}

0 commit comments

Comments
 (0)