Skip to content

Commit 8e2a3c9

Browse files
authored
fix(p2p): reqresp types + batch request tx pool filtering (#13666)
## Overview Incorrect typing in the reqresp module lead undefined results to NOT be filtered when being added to the tx pool
1 parent 6bc34a1 commit 8e2a3c9

File tree

6 files changed

+71
-11
lines changed

6 files changed

+71
-11
lines changed

yarn-project/p2p/src/client/p2p_client.test.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,60 @@ describe('In-Memory P2P Client', () => {
134134
await client.stop();
135135
});
136136

137+
it('request transactions handles missing items', async () => {
138+
// Mock a batch response that returns undefined items
139+
const mockTx1 = await mockTx();
140+
const mockTx2 = await mockTx();
141+
p2pService.sendBatchRequest.mockResolvedValue([mockTx1, undefined, mockTx2]);
142+
143+
// Spy on the tx pool addTxs method, it should not be called for the missing tx
144+
const addTxsSpy = jest.spyOn(txPool, 'addTxs');
145+
146+
await client.start();
147+
148+
const missingTxHash = (await mockTx()).getTxHash();
149+
const query = await Promise.all([mockTx1.getTxHash(), missingTxHash, mockTx2.getTxHash()]);
150+
const results = await client.requestTxsByHash(query);
151+
152+
expect(results).toEqual([mockTx1, undefined, mockTx2]);
153+
154+
expect(addTxsSpy).toHaveBeenCalledTimes(1);
155+
expect(addTxsSpy).toHaveBeenCalledWith([mockTx1, mockTx2]);
156+
});
157+
158+
it('getTxsByHash handles missing items', async () => {
159+
// We expect the node to fetch this item from their local p2p pool
160+
const txInMempool = await mockTx();
161+
// We expect this transaction to be requested from the network
162+
const txToBeRequested = await mockTx();
163+
// We expect this transaction to not be found
164+
const txToNotBeFound = await mockTx();
165+
166+
txPool.getTxByHash.mockImplementation(async txHash => {
167+
if (txHash === (await txInMempool.getTxHash())) {
168+
return txInMempool;
169+
}
170+
return undefined;
171+
});
172+
173+
const addTxsSpy = jest.spyOn(txPool, 'addTxs');
174+
const requestTxsSpy = jest.spyOn(client, 'requestTxsByHash');
175+
176+
p2pService.sendBatchRequest.mockResolvedValue([txToBeRequested, undefined]);
177+
178+
await client.start();
179+
180+
const query = await Promise.all([txInMempool.getTxHash(), txToBeRequested.getTxHash(), txToNotBeFound.getTxHash()]);
181+
const results = await client.getTxsByHash(query);
182+
183+
// We should return the resolved transactions
184+
expect(results).toEqual([txInMempool, txToBeRequested]);
185+
// We should add the found requested transactions to the pool
186+
expect(addTxsSpy).toHaveBeenCalledWith([txToBeRequested]);
187+
// We should request the missing transactions from the network, but only find one of them
188+
expect(requestTxsSpy).toHaveBeenCalledWith([await txToBeRequested.getTxHash(), await txToNotBeFound.getTxHash()]);
189+
});
190+
137191
describe('Chain prunes', () => {
138192
it('moves the tips on a chain reorg', async () => {
139193
blockSource.setProvenBlockNumber(0);

yarn-project/p2p/src/client/p2p_client.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -445,12 +445,17 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
445445
/**
446446
* Uses the batched Request Response protocol to request a set of transactions from the network.
447447
*/
448-
public async requestTxsByHash(txHashes: TxHash[]): Promise<Tx[]> {
449-
const txs = (await this.p2pService.sendBatchRequest(ReqRespSubProtocol.TX, txHashes)) ?? [];
450-
await this.txPool.addTxs(txs);
448+
public async requestTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]> {
449+
const txs = await this.p2pService.sendBatchRequest(ReqRespSubProtocol.TX, txHashes);
450+
451+
// Some transactions may return undefined, so we filter them out
452+
const filteredTxs = txs.filter((tx): tx is Tx => !!tx);
453+
await this.txPool.addTxs(filteredTxs);
451454
const txHashesStr = txHashes.map(tx => tx.toString()).join(', ');
452455
this.log.debug(`Received batched txs ${txHashesStr} (${txs.length} / ${txHashes.length}}) from peers`);
453-
return txs as Tx[];
456+
457+
// We return all transactions, even the not found ones to the caller, such they can handle missing items themselves.
458+
return txs;
454459
}
455460

456461
public getPendingTxs(): Promise<Tx[]> {
@@ -533,7 +538,8 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
533538
}
534539

535540
const missingTxs = await this.requestTxsByHash(missingTxHashes);
536-
return txs.filter((tx): tx is Tx => !!tx).concat(missingTxs);
541+
const fetchedMissingTxs = missingTxs.filter((tx): tx is Tx => !!tx);
542+
return txs.filter((tx): tx is Tx => !!tx).concat(fetchedMissingTxs);
537543
}
538544

539545
/**

yarn-project/p2p/src/services/libp2p/libp2p_service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
434434
sendBatchRequest<SubProtocol extends ReqRespSubProtocol>(
435435
protocol: SubProtocol,
436436
requests: InstanceType<SubProtocolMap[SubProtocol]['request']>[],
437-
): Promise<InstanceType<SubProtocolMap[SubProtocol]['response']>[] | undefined> {
437+
): Promise<(InstanceType<SubProtocolMap[SubProtocol]['response']> | undefined)[]> {
438438
return this.reqresp.sendBatchRequest(protocol, requests);
439439
}
440440

yarn-project/p2p/src/services/reqresp/reqresp.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,9 +264,9 @@ export class ReqResp {
264264
timeoutMs = 10000,
265265
maxPeers = Math.min(10, requests.length),
266266
maxRetryAttempts = 3,
267-
): Promise<InstanceType<SubProtocolMap[SubProtocol]['response']>[]> {
267+
): Promise<(InstanceType<SubProtocolMap[SubProtocol]['response']> | undefined)[]> {
268268
const responseValidator = this.subProtocolValidators[subProtocol];
269-
const responses: InstanceType<SubProtocolMap[SubProtocol]['response']>[] = new Array(requests.length);
269+
const responses: (InstanceType<SubProtocolMap[SubProtocol]['response']> | undefined)[] = new Array(requests.length);
270270
const requestBuffers = requests.map(req => req.toBuffer());
271271

272272
const requestFunction = async () => {
@@ -378,7 +378,7 @@ export class ReqResp {
378378
};
379379

380380
try {
381-
return await executeTimeout<InstanceType<SubProtocolMap[SubProtocol]['response']>[]>(
381+
return await executeTimeout<(InstanceType<SubProtocolMap[SubProtocol]['response']> | undefined)[]>(
382382
requestFunction,
383383
timeoutMs,
384384
() => new CollectiveReqRespTimeoutError(),

yarn-project/p2p/src/services/service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ export interface P2PService {
5656
sendBatchRequest<Protocol extends ReqRespSubProtocol>(
5757
protocol: Protocol,
5858
requests: InstanceType<SubProtocolMap[Protocol]['request']>[],
59-
): Promise<InstanceType<SubProtocolMap[Protocol]['response']>[] | undefined>;
59+
): Promise<(InstanceType<SubProtocolMap[Protocol]['response']> | undefined)[]>;
6060

6161
// Leaky abstraction: fix https://github.com/AztecProtocol/aztec-packages/issues/7963
6262
registerBlockReceivedCallback(callback: (block: BlockProposal) => Promise<BlockAttestation | undefined>): void;

yarn-project/stdlib/src/interfaces/prover-coordination.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ export interface ProverCoordination {
1616
/**
1717
* Returns a set of transactions given their hashes if available.
1818
* @param txHashes - The hashes of the transactions, used as an ID.
19-
* @returns The transactions, if found, 'undefined' otherwise.
19+
* @returns The transactions found, no necessarily in the same order as the hashes.
2020
*/
2121
getTxsByHash(txHashes: TxHash[]): Promise<Tx[]>;
2222
}

0 commit comments

Comments
 (0)