Skip to content

Commit 9c1d9f1

Browse files
authored
fix(p2p): better batch connection sampling (#13674)
## Overview Improves batch connection sampling such that we can increase the retry attempts for prover nodes requesting transaction information from their peers. ### Core improvements **Previously** When making a batch request, we requested the peer list, iterated through it until we found a peer without an reqresp active connection and used those. This meant that we were commonly only using the first 3-4 peers in the peer list. Now it will do random sampling without replacement to peers without already active reqresp connections. If there are non available, then we will just sample from peers already servicing requests.
1 parent c55088e commit 9c1d9f1

File tree

6 files changed

+123
-68
lines changed

6 files changed

+123
-68
lines changed

yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ describe('BatchConnectionSampler', () => {
2222

2323
// Mock libp2p to return our test peers
2424
libp2p = {
25-
getPeers: jest.fn().mockReturnValue(peers),
25+
getPeers: jest.fn().mockImplementation(() => [...peers]),
2626
} as unknown as jest.Mocked<Libp2p>;
2727

2828
// Create a real connection sampler with mocked random sampling
@@ -45,8 +45,7 @@ describe('BatchConnectionSampler', () => {
4545

4646
it('assigns requests to peers deterministically with wraparound', () => {
4747
// Mock to return first two peers
48-
let callCount = 0;
49-
mockRandomSampler.random.mockImplementation(() => callCount++ % 2);
48+
mockRandomSampler.random.mockImplementation(() => 0);
5049

5150
// With 5 requests and 2 peers:
5251
// floor(5/2) = 2 requests per peer
@@ -66,9 +65,7 @@ describe('BatchConnectionSampler', () => {
6665
});
6766

6867
it('handles peer removal and replacement', () => {
69-
mockRandomSampler.random.mockImplementation(_ => {
70-
return 2; // Return index 2 for replacement peer
71-
});
68+
mockRandomSampler.random.mockImplementation(_ => 0);
7269

7370
// With 4 requests and 2 peers:
7471
// floor(4/2) = 2 requests per peer
@@ -80,6 +77,8 @@ describe('BatchConnectionSampler', () => {
8077
const initialPeer = sampler.getPeerForRequest(0);
8178
expect(initialPeer).toBe(peers[0]);
8279

80+
// Mock random to return the third peer
81+
mockRandomSampler.random.mockImplementation(_ => 2);
8382
sampler.removePeerAndReplace(peers[0]);
8483

8584
// After replacement:
@@ -94,7 +93,7 @@ describe('BatchConnectionSampler', () => {
9493
});
9594

9695
it('handles peer removal and replacement - no replacement available', () => {
97-
mockRandomSampler.random.mockImplementation(() => 2);
96+
mockRandomSampler.random.mockImplementation(() => 0);
9897
const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 4, /* maxPeers */ 2);
9998

10099
expect(sampler.activePeerCount).toBe(2);
@@ -112,13 +111,7 @@ describe('BatchConnectionSampler', () => {
112111
});
113112

114113
it('distributes requests according to documentation example', () => {
115-
let callCount = 0;
116-
mockRandomSampler.random.mockImplementation(() => {
117-
if (callCount < 3) {
118-
return callCount++;
119-
}
120-
return 0;
121-
});
114+
mockRandomSampler.random.mockImplementation(() => 0);
122115

123116
// Example from doc comment:
124117
// Peers: [P1] [P2] [P3]
@@ -146,8 +139,7 @@ describe('BatchConnectionSampler', () => {
146139
});
147140

148141
it('same number of requests per peers', () => {
149-
let callCount = 0;
150-
mockRandomSampler.random.mockImplementation(() => callCount++ % 2);
142+
mockRandomSampler.random.mockImplementation(() => 0);
151143

152144
const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 2, /* maxPeers */ 2);
153145
expect(sampler.requestsPerBucket).toBe(1);
@@ -165,10 +157,9 @@ describe('BatchConnectionSampler', () => {
165157
expect(sampler.activePeerCount).toBe(0);
166158
expect(sampler.getPeerForRequest(0)).toBeUndefined();
167159

168-
let i = 0;
169-
mockRandomSampler.random.mockImplementation(() => i++ % 3);
160+
mockRandomSampler.random.mockImplementation(() => 0);
170161

171-
libp2p.getPeers.mockReturnValue(peers);
162+
libp2p.getPeers.mockImplementation(() => [...peers]);
172163
const samplerWithMorePeers = new BatchConnectionSampler(connectionSampler, /* batchSize */ 2, /* maxPeers */ 3);
173164
expect(samplerWithMorePeers.requestsPerBucket).toBe(1); // floor(2/3) = 0
174165
// First two requests go to first two peers

yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,11 @@ export class BatchConnectionSampler {
7070

7171
if (newPeer) {
7272
this.batch[index] = newPeer;
73-
this.logger.trace(`Replaced peer ${peerId} with ${newPeer}`, { peerId, newPeer });
73+
this.logger.trace('Replaced peer', { peerId, newPeer });
7474
} else {
7575
// If we couldn't get a replacement, remove the peer and compact the array
7676
this.batch.splice(index, 1);
77-
this.logger.trace(`Removed peer ${peerId}`, { peerId });
77+
this.logger.trace('Removed peer', { peerId });
7878
}
7979
}
8080

yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ describe('ConnectionSampler', () => {
2020

2121
// Mock libp2p
2222
mockLibp2p = {
23-
getPeers: jest.fn().mockReturnValue([...peers]),
23+
getPeers: jest.fn().mockImplementation(() => [...peers]),
2424
dialProtocol: jest.fn(),
2525
};
2626

@@ -196,15 +196,28 @@ describe('ConnectionSampler', () => {
196196

197197
// Mock libp2p
198198
mockLibp2p = {
199-
getPeers: jest.fn().mockReturnValue(peers),
199+
getPeers: jest.fn().mockImplementation(() => [...peers]),
200200
dialProtocol: jest.fn(),
201201
};
202202

203203
mockRandomSampler = mock<RandomSampler>();
204+
mockRandomSampler.random.mockReturnValue(0);
204205
sampler = new ConnectionSampler(mockLibp2p, 1000, mockRandomSampler);
205206
});
206207

208+
it('should only return samples as many peers as available', () => {
209+
const sampledPeers = sampler.samplePeersBatch(100);
210+
211+
expect(sampledPeers).toHaveLength(peers.length);
212+
});
213+
207214
it('prioritizes peers without active connections', () => {
215+
mockRandomSampler.random
216+
// Will pick the peers with active connections
217+
.mockReturnValueOnce(3)
218+
.mockReturnValueOnce(3)
219+
.mockReturnValue(0);
220+
208221
// Set up some peers with active connections
209222
sampler['activeConnectionsCount'].set(peers[3], 1);
210223
sampler['activeConnectionsCount'].set(peers[4], 2);
@@ -248,7 +261,8 @@ describe('ConnectionSampler', () => {
248261
const sampledPeers = sampler.samplePeersBatch(3);
249262

250263
expect(sampledPeers).toHaveLength(3);
251-
expect(sampledPeers).toEqual(expect.arrayContaining([peers[0], peers[1], peers[2]]));
264+
// The last one will be picked first, then the first one, then the second one
265+
expect(sampledPeers).toEqual(expect.arrayContaining([peers[4], peers[0], peers[1]]));
252266
});
253267

254268
it('handles case when fewer peers available than requested', () => {

yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts

Lines changed: 82 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ export class RandomSampler {
1717
}
1818

1919
/**
20-
* A class that samples peers from the libp2p node and returns a peer that we don't already have a connection open to.
20+
* A class that samples peers from the libp2p node and returns a peer that we don't already have a reqresp connection open to.
2121
* If we already have a connection open, we try to sample a different peer.
2222
* We do this MAX_SAMPLE_ATTEMPTS times, if we still don't find a peer we just go for it.
2323
*
@@ -69,32 +69,65 @@ export class ConnectionSampler {
6969
getPeer(excluding?: Map<string, boolean>): PeerId | undefined {
7070
// In libp2p getPeers performs a shallow copy, so this array can be sliced from safetly
7171
const peers = this.libp2p.getPeers();
72+
const { peer } = this.getPeerFromList(peers, excluding);
73+
return peer;
74+
}
7275

76+
/**
77+
* Samples a peer from a list of peers, excluding those that have active (reqresp) connections or are in the exclusion list
78+
*
79+
* @param peers - The list of peers to sample from
80+
* @param excluding - The peers to exclude from the sampling
81+
* @returns - A peer from the list, or undefined if no peers are available,
82+
* - a boolean indicating if the peer has active connections, and
83+
* - all sampled peers - to enable optional resampling
84+
*
85+
* @dev The provided list peers, should be mutated by this function. This allows batch sampling
86+
* to be performed without making extra copies of the list.
87+
*/
88+
getPeerFromList(
89+
peers: PeerId[],
90+
excluding?: Map<string, boolean>,
91+
): {
92+
peer: PeerId | undefined;
93+
sampledPeers: PeerId[];
94+
} {
7395
if (peers.length === 0) {
74-
return undefined;
96+
return { peer: undefined, sampledPeers: [] };
7597
}
7698

77-
let randomIndex = this.sampler.random(peers.length);
78-
let attempts = 0;
79-
80-
// Keep sampling while:
81-
// - we haven't exceeded max attempts AND
82-
// - either the peer has active connections OR is in the exclusion list
83-
while (
84-
attempts < MAX_SAMPLE_ATTEMPTS &&
85-
((this.activeConnectionsCount.get(peers[randomIndex]) ?? 0) > 0 ||
86-
(excluding?.get(peers[randomIndex]?.toString()) ?? false))
87-
) {
99+
const sampledPeers: PeerId[] = [];
100+
// Try to find a peer that has no active connections and is not in the exclusion list
101+
for (let attempts = 0; attempts < MAX_SAMPLE_ATTEMPTS && peers.length > 0; attempts++) {
102+
const randomIndex = this.sampler.random(peers.length);
103+
const peer = peers[randomIndex];
104+
const hasActiveConnections = (this.activeConnectionsCount.get(peer) ?? 0) > 0;
105+
const isExcluded = excluding?.get(peer.toString()) ?? false;
106+
107+
// Remove this peer from consideration
88108
peers.splice(randomIndex, 1);
89-
randomIndex = this.sampler.random(peers.length);
90-
attempts++;
109+
110+
// If peer is suitable (no active connections and not excluded), return it
111+
if (!hasActiveConnections && !isExcluded) {
112+
this.logger.trace('Sampled peer', {
113+
attempts,
114+
peer,
115+
});
116+
return { peer, sampledPeers };
117+
}
118+
119+
// Keep track of peers that have active reqresp channels, batch sampling will use these to resample
120+
sampledPeers.push(peer);
91121
}
92122

93-
this.logger.trace(`Sampled peer in ${attempts} attempts`, {
94-
attempts,
95-
peer: peers[randomIndex]?.toString(),
123+
// If we've exhausted our attempts or peers list is empty, return the last peer if available
124+
const lastPeer = peers.length > 0 ? peers[this.sampler.random(peers.length)] : undefined;
125+
126+
this.logger.trace('Sampled peer', {
127+
attempts: MAX_SAMPLE_ATTEMPTS,
128+
peer: lastPeer?.toString(),
96129
});
97-
return peers[randomIndex];
130+
return { peer: lastPeer, sampledPeers };
98131
}
99132

100133
/**
@@ -105,34 +138,41 @@ export class ConnectionSampler {
105138
*/
106139
samplePeersBatch(numberToSample: number): PeerId[] {
107140
const peers = this.libp2p.getPeers();
108-
const sampledPeers: PeerId[] = [];
109-
const peersWithConnections: PeerId[] = []; // Hold onto peers with active connections incase we need to sample more
141+
this.logger.debug('Sampling peers batch', { numberToSample, peers });
110142

111-
for (const peer of peers) {
112-
const activeConnections = this.activeConnectionsCount.get(peer) ?? 0;
113-
if (activeConnections === 0) {
114-
if (sampledPeers.push(peer) === numberToSample) {
115-
return sampledPeers;
116-
}
117-
} else {
118-
peersWithConnections.push(peer);
143+
// Only sample as many peers as we have available
144+
numberToSample = Math.min(numberToSample, peers.length);
145+
146+
const batch: PeerId[] = [];
147+
const withActiveConnections: Set<PeerId> = new Set();
148+
for (let i = 0; i < numberToSample; i++) {
149+
const { peer, sampledPeers } = this.getPeerFromList(peers, undefined);
150+
if (peer) {
151+
batch.push(peer);
152+
}
153+
if (sampledPeers.length > 0) {
154+
sampledPeers.forEach(peer => withActiveConnections.add(peer));
119155
}
120156
}
157+
const lengthWithoutConnections = batch.length;
121158

122159
// If we still need more peers, sample from those with connections
123-
while (sampledPeers.length < numberToSample && peersWithConnections.length > 0) {
124-
const randomIndex = this.sampler.random(peersWithConnections.length);
125-
const [peer] = peersWithConnections.splice(randomIndex, 1);
126-
sampledPeers.push(peer);
160+
while (batch.length < numberToSample && withActiveConnections.size > 0) {
161+
const randomIndex = this.sampler.random(withActiveConnections.size);
162+
163+
const peer = Array.from(withActiveConnections)[randomIndex];
164+
withActiveConnections.delete(peer);
165+
batch.push(peer);
127166
}
128167

129-
this.logger.trace(`Batch sampled ${sampledPeers.length} unique peers`, {
130-
peers: sampledPeers,
131-
withoutConnections: sampledPeers.length - peersWithConnections.length,
132-
withConnections: peersWithConnections.length,
168+
this.logger.trace('Batch sampled peers', {
169+
length: batch.length,
170+
peers: batch,
171+
withoutConnections: lengthWithoutConnections,
172+
withConnections: numberToSample - lengthWithoutConnections,
133173
});
134174

135-
return sampledPeers;
175+
return batch;
136176
}
137177

138178
// Set of passthrough functions to keep track of active connections
@@ -155,8 +195,9 @@ export class ConnectionSampler {
155195
const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 0) + 1;
156196
this.activeConnectionsCount.set(peerId, updatedActiveConnectionsCount);
157197

158-
this.logger.trace(`Dialed protocol ${protocol} with peer ${peerId.toString()}`, {
198+
this.logger.trace('Dialed protocol', {
159199
streamId: stream.id,
200+
protocol,
160201
peerId: peerId.toString(),
161202
activeConnectionsCount: updatedActiveConnectionsCount,
162203
});
@@ -181,7 +222,7 @@ export class ConnectionSampler {
181222
const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 1) - 1;
182223
this.activeConnectionsCount.set(peerId, updatedActiveConnectionsCount);
183224

184-
this.logger.trace(`Closing connection to peer ${peerId.toString()}`, {
225+
this.logger.trace('Closing connection', {
185226
streamId,
186227
peerId: peerId.toString(),
187228
protocol: stream.protocol,
@@ -207,7 +248,7 @@ export class ConnectionSampler {
207248
// Check if we have lost track of accounting
208249
if (this.activeConnectionsCount.get(peerId) === 0) {
209250
await this.close(streamId);
210-
this.logger.debug(`Cleaned up stale connection ${streamId} to peer ${peerId.toString()}`);
251+
this.logger.debug('Cleaned up stale connection', { streamId, peerId: peerId.toString() });
211252
}
212253
} catch (error) {
213254
this.logger.error(`Error cleaning up stale connection ${streamId}`, { error });

yarn-project/p2p/src/services/reqresp/reqresp.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ export class ReqResp {
196196
this.logger.trace(`Sending request to peer: ${peer.toString()}`);
197197
const response = await this.sendRequestToPeer(peer, subProtocol, requestBuffer);
198198

199-
if (response && response.status !== ReqRespStatus.SUCCESS) {
199+
if (response.status !== ReqRespStatus.SUCCESS) {
200200
this.logger.debug(
201201
`Request to peer ${peer.toString()} failed with status ${prettyPrintReqRespStatus(response.status)}`,
202202
);
@@ -325,7 +325,7 @@ export class ReqResp {
325325
const response = await this.sendRequestToPeer(peer, subProtocol, requestBuffers[index]);
326326

327327
// Check the status of the response buffer
328-
if (response && response.status !== ReqRespStatus.SUCCESS) {
328+
if (response.status !== ReqRespStatus.SUCCESS) {
329329
this.logger.debug(
330330
`Request to peer ${peer.toString()} failed with status ${prettyPrintReqRespStatus(
331331
response.status,
@@ -421,7 +421,7 @@ export class ReqResp {
421421
peerId: PeerId,
422422
subProtocol: ReqRespSubProtocol,
423423
payload: Buffer,
424-
): Promise<ReqRespResponse | undefined> {
424+
): Promise<ReqRespResponse> {
425425
let stream: Stream | undefined;
426426
try {
427427
this.metrics.recordRequestSent(subProtocol);
@@ -439,6 +439,12 @@ export class ReqResp {
439439
} catch (e: any) {
440440
this.metrics.recordRequestError(subProtocol);
441441
this.handleResponseError(e, peerId, subProtocol);
442+
443+
// If there is an exception, we return an unknown response
444+
return {
445+
status: ReqRespStatus.FAILURE,
446+
data: Buffer.from([]),
447+
};
442448
} finally {
443449
// Only close the stream if we created it
444450
if (stream) {

yarn-project/p2p/src/services/reqresp/status.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export enum ReqRespStatus {
55
SUCCESS = 0,
66
RATE_LIMIT_EXCEEDED = 1,
77
BADLY_FORMED_REQUEST = 2,
8+
FAILURE = 126,
89
UNKNOWN = 127,
910
}
1011

@@ -53,6 +54,8 @@ export function prettyPrintReqRespStatus(status: ReqRespStatus) {
5354
return 'RATE_LIMIT_EXCEEDED';
5455
case ReqRespStatus.BADLY_FORMED_REQUEST:
5556
return 'BADLY_FORMED_REQUEST';
57+
case ReqRespStatus.FAILURE:
58+
return 'FAILURE';
5659
case ReqRespStatus.UNKNOWN:
5760
return 'UNKNOWN';
5861
}

0 commit comments

Comments
 (0)