Conversation
I was removing the rewrite of the uri from `ws(s)` to `http(s)`, but I forgot to change `HttpClient` to `WsClient`. This is now done by this pr.
…ternalities-stuff
Live mode works with either ws(s):// or http(s):// in --uri. The endpoint is converted to HTTP for state download and to WS for block/header RPC. This is a temporary workaround until frame-remote-externalities restores proper WS transport support in a future release; see paritytech/polkadot-sdk#10258, paritytech/polkadot-sdk#10766, and paritytech/polkadot-sdk#10779 .
ggwpez
left a comment
There was a problem hiding this comment.
Do you have a branch with the patched CLI to try this or do you just use the test?
| WsClientBuilder::default() | ||
| .max_request_size(u32::MAX) | ||
| .max_response_size(u32::MAX) | ||
| .request_timeout(std::time::Duration::from_secs(60 * 5)) |
| let last_key_bytes = last_key.as_ref(); | ||
|
|
||
| // Find the first byte position where the two keys diverge | ||
| let divergence_pos = second_last_bytes |
There was a problem hiding this comment.
Is it important to always return 16 ranges here? Otherwise the function could probably be simplified by solely operating op nibbles instead of a mix between nibbles and bytes.
Or maybe it could be a recursive function which just interpolates between two keys and returns two ranges. Then you can call that recursively.
Currently in test subdivide_creates_continuation_ranges:
let prefix = key(&[0xAB]);
let second_last = key(&[0xAB, 0x12, 0x34]);
let last = key(&[0xAB, 0x12, 0x35]);It returns start keys AB 12 35, AB 12 40, AB 12 50...
Which ignores the common nibble prefix AB 12 3
| self.conn_manager | ||
| .as_ref() | ||
| .map(|cm| cm.num_clients() * Self::PARALLEL_REQUESTS_PER_CLIENT) | ||
| .unwrap_or(Self::PARALLEL_REQUESTS_PER_CLIENT) |
There was a problem hiding this comment.
| .unwrap_or(Self::PARALLEL_REQUESTS_PER_CLIENT) | |
| expect("connection manager must be initialized; qed") |
Dont think it makes sense to call this earlier.
| for i in 0..conn_manager.num_clients() { | ||
| let client = conn_manager.get(i).await; | ||
| let result = with_timeout( |
There was a problem hiding this comment.
This boilerplate could to into an any_client function, I saw it three or four times now.
But I know that Claude loves to just copy paste stuff 😆
| let key_count = sp_io::storage::next_key(&[]) | ||
| .map(|first_key| { | ||
| let mut count = 1; | ||
| let mut current = first_key; | ||
| while let Some(next) = sp_io::storage::next_key(¤t) { | ||
| count += 1; | ||
| current = next; | ||
| } | ||
| count | ||
| }) | ||
| .unwrap_or(0); |
There was a problem hiding this comment.
| let key_count = sp_io::storage::next_key(&[]) | |
| .map(|first_key| { | |
| let mut count = 1; | |
| let mut current = first_key; | |
| while let Some(next) = sp_io::storage::next_key(¤t) { | |
| count += 1; | |
| current = next; | |
| } | |
| count | |
| }) | |
| .unwrap_or(0); | |
| let key_count = | |
| frame_support::storage::KeyPrefixIterator::new(vec![], vec![], |_| Ok(())).count(); |
I think frame-support should be fine as dev dependency.
| active_workers.fetch_sub(1, Ordering::SeqCst); | ||
| is_active = false; |
There was a problem hiding this comment.
Can this underflow if two threads call pop_front but the one that got None is faster than the one that got Some(..)?
I think this could be a race.
| is_active = false; | ||
| } | ||
|
|
||
| sleep(Duration::from_millis(100)).await; |
There was a problem hiding this comment.
Otherwise you have some busy loop when there is no more work to fetch.
| let queue_len = work_queue.lock().unwrap().len(); | ||
| let active = active_workers.load(Ordering::SeqCst); | ||
|
|
||
| if queue_len == 0 && active == 0 { |
There was a problem hiding this comment.
Why does it wait for active == 0 here?
Dont want to annoy you, but if this work_queue were a channel then every thread can try to read and abort if nothing is read which equally distributes work and stops them when no more work is there.
There was a problem hiding this comment.
The point being that work maybe is added back. When a client is failing around the end of the work, but all other workers already shut down we may land in some kind of deadlock. Because the dead worker may has blocked us and we will not be able to reconnect for some time. So, we wait until all are finished.
|
/cmd fmt |
| /// | ||
| /// Creates ranges by appending one nibble (4 bits) to the prefix. | ||
| /// This gives us: 0x00, 0x10, 0x20, ..., 0xF0 | ||
| pub(crate) fn gen_key_ranges(prefix: &StorageKey) -> Vec<KeyRange> { |
There was a problem hiding this comment.
this excludes the key at prefix. eg prefix "" the key at root is excluded as we only start with key at 00
| Self { start_key, end_key, prefix, page_size: DEFAULT_PAGE_SIZE, exclude_start_key: true } | ||
| } | ||
|
|
||
| /// Returns a new KeyRange with halved page size (minimum 10). |
There was a problem hiding this comment.
minimum 10 sounds rather small, I guess some content may be 10 one megs value, but then I would consider minimum 1. Not easy to find a good rational here, should be data size related but this data size is only known from server, and control to this size is for client. Likely server being allowed to politely ask for smaller batch size could also make sense. Certainly not something needed at this point.
| /// This prevents infinite subdivision when continuation ranges filter out the start_key. | ||
| pub(crate) fn filter_keys(&self, keys: Vec<StorageKey>) -> (Vec<StorageKey>, bool) { | ||
| let rpc_returned_full_batch = keys.len() == self.page_size as usize; | ||
|
|
There was a problem hiding this comment.
Not sure if we have a good BitVec implementation somewhere to replace this Vec.
There was a problem hiding this comment.
I should have put the comment one line below: let filtered: Vec<_> = keys
| #[derive(Debug, Clone)] | ||
| pub(crate) struct Client { | ||
| pub(crate) ws_client: Arc<WsClient>, | ||
| pub(crate) version: u64, |
There was a problem hiding this comment.
I m finding "version" confusing, may be just me, but I was expecting some versioning shared between client and server or something related to a change of implementation. Is it just a monotonic client counter? something like "current" or "ongoing_connection_number"?
There was a problem hiding this comment.
I had to reread this a few times. From what I understood,
Is it just a monotonic client counter?
this ⬆️ is the case.
fn recreate_client is not idempotent because the counter is monotonic, as you said.
- a processor gets a client started on a work item
- it dies or stalls, and returns
ProcessResult::Retry recreate_clientis called (if the appropriate flag is set)- the client is recreated with
versionincremented by 1
As I parsed this, if called twice (or more) on the same stale client, only the first will take effect.
There was a problem hiding this comment.
Is it just a monotonic client counter?
Yes this. We share the client between different worker threads. If one worker fails, the others will also fail (because they are using the same WS connection). To ensure that we do not recreate the same client multiple times, everyone has this version. Only if the version matches, we will recreate it. The other threads can just use the already recreated client.
| } | ||
| Some(w) | ||
| }, | ||
| None => { |
There was a problem hiding this comment.
I am not too sure, if a client get inactive, then active_workers change, but the connection manager keeps same number of clients so workitem will be tried again on same bad client?
| E: std::fmt::Debug, | ||
| { | ||
| let conn_manager = self.conn_manager().map_err(|_| ())?; | ||
| for i in 0..conn_manager.num_clients() { |
There was a problem hiding this comment.
Could it make sense to start with a different offset every time? (to avoid staying on a single okish client 0 or to distribute better).
There was a problem hiding this comment.
Could it make sense to start with a different offset every time?
+1
Possible load imbalance from fixed iteration order.
Perhaps as an alternative, a start pointer that rotates, or just select at random (easier).
| let batches: VecDeque<_> = payloads | ||
| .chunks(BATCH_SIZE) | ||
| .enumerate() | ||
| .map(|(i, chunk)| (i * BATCH_SIZE, chunk.to_vec(), BATCH_SIZE)) |
There was a problem hiding this comment.
Is it possible to build this directly (not instantiating 'payloads').
| /// Subdivide the key space AFTER the last_key into up to 16 new ranges. | ||
| /// | ||
| /// Takes the last two keys from a batch to find where they diverge, then creates | ||
| /// ranges based on incrementing the nibble at the divergence point. |
There was a problem hiding this comment.
this function does not look simple to me. I am wondering why we could not just split a failing workitem at once (we batch by 16, so we are runing bit aligned split, as long as we split by power of 2 we can just append some key prefixing with an attached bit depth.
There was a problem hiding this comment.
I can not follow what you are writing. Please explain it better. Also this function is not used for failing work items.
There was a problem hiding this comment.
IIUC, you reduce the size of the batch on Retry then after this item being processed, you use this function to process remaining data from initial batch.
I was wondering if it would be more straightforward to just split the batch , pass the split batch to retry, queue somehow and not have this method.
| } | ||
|
|
||
| /// Called when a request fails. Triggers client recreation if version matches. | ||
| pub(crate) async fn recreate_client(&self, worker_index: usize, failed: Client) { |
There was a problem hiding this comment.
Wonder if 'recreate client' should use a different/new client uri: here in case of a lost client, are things blocked?
There was a problem hiding this comment.
Disconnected, offline. I should have say server (even if it is wsclient).
There was a problem hiding this comment.
We already connect to all provided servers, so we can not really switch.
There was a problem hiding this comment.
IIUC the servers range to import is being split between all workers, using a modulo of the number of workers (parallel). Can a work item be requested over another server if one gets unreachable? (from current code the mapping work item with batch seems pretty static (the connection manager number of peers not changing I think).
There was a problem hiding this comment.
Work items are put back to the list and then some other worker will pick it up. So yes, it will be moved to another server.
rockbmb
left a comment
There was a problem hiding this comment.
This PR has serious depth; I will return for another look, especially at the connection manager test.
| WsClientBuilder::default() | ||
| .max_request_size(u32::MAX) | ||
| .max_response_size(u32::MAX) | ||
| .request_timeout(std::time::Duration::from_secs(60 * 5)) |
There was a problem hiding this comment.
Could you make this a const in the beginning of the module, like you did for RPC_TIMEOUT?
| #[derive(Debug, Clone)] | ||
| pub(crate) struct Client { | ||
| pub(crate) ws_client: Arc<WsClient>, | ||
| pub(crate) version: u64, |
There was a problem hiding this comment.
I had to reread this a few times. From what I understood,
Is it just a monotonic client counter?
this ⬆️ is the case.
fn recreate_client is not idempotent because the counter is monotonic, as you said.
- a processor gets a client started on a work item
- it dies or stalls, and returns
ProcessResult::Retry recreate_clientis called (if the appropriate flag is set)- the client is recreated with
versionincremented by 1
As I parsed this, if called twice (or more) on the same stale client, only the first will take effect.
| E: std::fmt::Debug, | ||
| { | ||
| let conn_manager = self.conn_manager().map_err(|_| ())?; | ||
| for i in 0..conn_manager.num_clients() { |
There was a problem hiding this comment.
Could it make sense to start with a different offset every time?
+1
Possible load imbalance from fixed iteration order.
Perhaps as an alternative, a start pointer that rotates, or just select at random (easier).
cheme
left a comment
There was a problem hiding this comment.
guess randomization or rolling choice of first peer (https://github.com/paritytech/polkadot-sdk/pull/10779/files#r2696109976) would be good, but not strictly needed yet.
…ternalities-stuff' into bkchr-new-remote-externalities-stuff
|
All GitHub workflows were cancelled due to failure one of the required jobs. |
…nism (paritytech#10866) # Description Follow-up to paritytech#10766, which was closed in favor of paritytech#10779. Rework remote externality child key test; it was failing since the proxy pallet has no child storages. ## Integration N/A # Checklist * [x] My PR includes a detailed description as outlined in the "Description" and its two subsections above. * [x] My PR follows the [labeling requirements]( https://github.com/paritytech/polkadot-sdk/blob/master/docs/contributor/CONTRIBUTING.md#Process ) of this project (at minimum one label for `T` required) * External contributors: Use `/cmd label <label-name>` to add labels * Maintainers can also add labels manually * [x] I have made corresponding changes to the documentation (if applicable) * [x] I have added tests that prove my fix is effective or that my feature works (if applicable)
This is a major refactoring of
remote-externalitiesto improve the download speed of the state of chain. This is mainly achieved by download keys + values from multiple RPC servers in parallel. Also the key downloading is done more smartly by dividing downloaded key ranges dynamically, instead of having fixed number of key ranges at startup.Besides this it does a lot more refactoring + clean ups.
All in all this brings down the download time for PAH from 2h+ to 15min with ~5 RPC servers.