diff --git a/CHANGELOG.md b/CHANGELOG.md index 790d2be303f..15c0c748db8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,8 @@ - `PluginTransactionSelectorFactory.create(final SelectorsStateManager selectorsStateManager)` is deprecated for removal ### Bug fixes +- Fix eth/69 snap sync receipt root mismatch by correctly identifying Frontier transaction type in SyncTransactionReceiptEncoder [#9900](https://github.com/hyperledger/besu/pull/9900) +- Fix outstanding request counter leak in RequestManager that could cause peers to appear at capacity [#9900](https://github.com/hyperledger/besu/pull/9900) - BFT forks that change block period on time-based forks don't take effect [9681](https://github.com/hyperledger/besu/issues/9681) - Fix QBFT `RLPException` when decoding proposals from pre-26.1.0 nodes that do not include the `blockAccessList` field [#9977](https://github.com/hyperledger/besu/pull/9977) - Fix eth_simulateV1 discrepancy [9960] (https://github.com/besu-eth/besu/issues/9960) eth_simulateV1 now accepts calls where both input and data diff --git a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java index 729b68001ba..421b4219563 100644 --- a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java +++ b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java @@ -162,6 +162,8 @@ private List commandlineArgs(final BesuNode node, final Path dataDir) { } params.add("--sync-min-peers"); params.add(Integer.toString(node.getSynchronizerConfiguration().getSyncMinimumPeerCount())); + params.add("--Xsynchronizer-pivot-distance"); + params.add(Integer.toString(node.getSynchronizerConfiguration().getSyncPivotDistance())); } else { params.add("--sync-mode"); params.add("FULL"); diff --git a/acceptance-tests/tests/src/acceptanceTest/java/org/hyperledger/besu/tests/acceptance/clique/CliqueToPoSTest.java b/acceptance-tests/tests/src/acceptanceTest/java/org/hyperledger/besu/tests/acceptance/clique/CliqueToPoSTest.java index 913173442fb..c79746302d4 100644 --- a/acceptance-tests/tests/src/acceptanceTest/java/org/hyperledger/besu/tests/acceptance/clique/CliqueToPoSTest.java +++ b/acceptance-tests/tests/src/acceptanceTest/java/org/hyperledger/besu/tests/acceptance/clique/CliqueToPoSTest.java @@ -1,5 +1,5 @@ /* - * Copyright contributors to Besu. + * Copyright contributors to Hyperledger Besu. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -19,6 +19,8 @@ import org.hyperledger.besu.ethereum.eth.sync.SyncMode; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.ImmutableSnapSyncConfiguration; +import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncConfiguration; import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration; import org.hyperledger.besu.tests.acceptance.dsl.AcceptanceTestBase; import org.hyperledger.besu.tests.acceptance.dsl.node.BesuNode; @@ -89,6 +91,9 @@ public static void runBesuCommand(final Path dataPath) throws IOException, Inter @Test public void blocksAreBuiltAndNodesSyncAfterSwitchingToPoS() throws Exception { + final SnapSyncConfiguration snapServerEnabledConfig = + ImmutableSnapSyncConfiguration.builder().isSnapServerEnabled(true).build(); + final BesuNode minerNode = besu.createNode( "miner", @@ -103,6 +108,13 @@ public void blocksAreBuiltAndNodesSyncAfterSwitchingToPoS() throws Exception { .engineRpcEnabled(true) .miningEnabled()); + minerNode.setSynchronizerConfiguration( + SynchronizerConfiguration.builder() + .syncMode(SyncMode.FULL) + .syncMinimumPeerCount(1) + .snapSyncConfiguration(snapServerEnabledConfig) + .build()); + // First sync node uses full sync and starts fresh; it does not produce blocks final BesuNode syncNodeFull = besu.createNode( @@ -119,6 +131,7 @@ public void blocksAreBuiltAndNodesSyncAfterSwitchingToPoS() throws Exception { SynchronizerConfiguration.builder() .syncMode(SyncMode.FULL) .syncMinimumPeerCount(1) + .snapSyncConfiguration(snapServerEnabledConfig) .build()) .engineRpcEnabled(true)); @@ -140,6 +153,8 @@ public void blocksAreBuiltAndNodesSyncAfterSwitchingToPoS() throws Exception { SynchronizerConfiguration.builder() .syncMode(SyncMode.SNAP) .syncMinimumPeerCount(1) + .syncPivotDistance(2) + .snapSyncConfiguration(snapServerEnabledConfig) .build()); // Copy key files to the miner node datadir @@ -161,36 +176,32 @@ public void blocksAreBuiltAndNodesSyncAfterSwitchingToPoS() throws Exception { } minerNode.verify(blockchain.currentHeight(10)); - final String block10Hash = latestPayload.get("blockHash").asText(); + final String headHash = latestPayload.get("blockHash").asText(); // Add the full sync node to the cluster so it peers with minerNode cluster.addNode(syncNodeFull); // ensure the node reached TTD first - syncNodeFull.verify(blockchain.minimumHeight(4, 10)); + syncNodeFull.verify(blockchain.minimumHeight(4, 30)); - // A single forkchoiceUpdatedV1 pointing at block 10 kicks off backward sync; + // A single forkchoiceUpdatedV1 pointing at head kicks off backward sync; // syncNodeFull does not have the block yet so it responds SYNCING and begins downloading - triggerSyncViaForkchoiceUpdate(syncNodeFull, block10Hash); + triggerSyncViaForkchoiceUpdate(syncNodeFull, headHash); // Wait for full sync to complete and verify the full chain is present syncNodeFull.verify(blockchain.minimumHeight(10, 30)); - /* TODO: Uncomment when snap sync is fixed to work for small networks - // Add the snap sync node to the cluster so it peers with minerNode cluster.addNode(syncNodeSnap); syncNodeSnap.awaitPeerDiscovery(net.awaitPeerCount(2)); - // A single forkchoiceUpdatedV1 pointing at block 10 kicks off snap sync to pivot; + // A single forkchoiceUpdatedV1 pointing at head kicks off snap sync to pivot; // syncNodeSnap does not have the block yet so it responds SYNCING and begins downloading - triggerSyncViaForkchoiceUpdate(syncNodeSnap, block10Hash); + triggerSyncViaForkchoiceUpdate(syncNodeSnap, headHash); // Wait for snap sync to complete and verify the full chain is present - syncNodeSnap.verify(blockchain.minimumHeight(10, 60)); - - */ + syncNodeSnap.verify(blockchain.minimumHeight(10, 120)); } /** diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/encoding/receipt/SyncTransactionReceiptDecoder.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/encoding/receipt/SyncTransactionReceiptDecoder.java index 88a9beacfe4..5ccf8155634 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/encoding/receipt/SyncTransactionReceiptDecoder.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/encoding/receipt/SyncTransactionReceiptDecoder.java @@ -98,7 +98,7 @@ private SyncTransactionReceipt decodeEth69Receipt( final Bytes statusOrStateRoot) { Bytes transactionTypeCode = transactionByteRlp.isEmpty() - ? Bytes.of(TransactionType.FRONTIER.getSerializedType()) + ? Bytes.of(TransactionType.FRONTIER.getEthSerializedType()) : transactionByteRlp; Bytes cumulativeGasUsed = input.readBytes(); List> logs = parseLogs(input); @@ -117,7 +117,7 @@ private SyncTransactionReceipt decodeLegacyReceipt( if (bloomFilter != null) { syncTransactionReceipt = new SyncTransactionReceipt(rawRlp); } else { - Bytes transactionTypeCode = Bytes.of(TransactionType.FRONTIER.getSerializedType()); + Bytes transactionTypeCode = Bytes.of(TransactionType.FRONTIER.getEthSerializedType()); List> logs = parseLogs(input); syncTransactionReceipt = new SyncTransactionReceipt( diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/encoding/receipt/SyncTransactionReceiptEncoder.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/encoding/receipt/SyncTransactionReceiptEncoder.java index 12ccce4d162..a5aaa4c48bb 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/encoding/receipt/SyncTransactionReceiptEncoder.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/encoding/receipt/SyncTransactionReceiptEncoder.java @@ -32,10 +32,11 @@ public SyncTransactionReceiptEncoder(final SimpleNoCopyRlpEncoder rlpEncoder) { } public Bytes encodeForRootCalculation(final SyncTransactionReceipt receipt) { + final Bytes typeCode = receipt.getTransactionTypeCode(); final boolean isFrontier = - !receipt.getTransactionTypeCode().isEmpty() - && receipt.getTransactionTypeCode().get(0) - == TransactionType.FRONTIER.getSerializedType(); + typeCode.isEmpty() + || typeCode.get(0) == TransactionType.FRONTIER.getEthSerializedType() + || typeCode.get(0) == TransactionType.FRONTIER.getSerializedType(); List encodedLogs = receipt.getLogs().stream() diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/RequestManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/RequestManager.java index 20d7c31dfdd..89f43346253 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/RequestManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/RequestManager.java @@ -27,6 +27,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -67,14 +68,15 @@ public ResponseStream dispatchRequest(final RequestSender sender, final MessageD } public void dispatchResponse(final EthMessage ethMessage) { - final Collection streams = List.copyOf(responseStreams.values()); - final int count = outstandingRequests.decrementAndGet(); try { final Map.Entry requestIdAndEthMessage = ethMessage.getData().unwrapMessageData(); Optional.ofNullable(responseStreams.get(requestIdAndEthMessage.getKey())) .ifPresentOrElse( - responseStream -> responseStream.processMessage(requestIdAndEthMessage.getValue()), + responseStream -> { + responseStream.releaseOutstandingRequest(); + responseStream.processMessage(requestIdAndEthMessage.getValue()); + }, // Consider incorrect requestIds to be a useless response; too // many of these and we will disconnect. () -> peer.recordUselessResponse("Request ID incorrect")); @@ -90,9 +92,9 @@ public void dispatchResponse(final EthMessage ethMessage) { DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED); } - if (count == 0) { + if (outstandingRequests.get() == 0) { // No possibility of any remaining outstanding messages - closeOutstandingStreams(streams); + closeOutstandingStreams(List.copyOf(responseStreams.values())); } } @@ -101,7 +103,15 @@ public void close() { } private ResponseStream createStream(final BigInteger requestId) { - final ResponseStream stream = new ResponseStream(peer, () -> deregisterStream(requestId)); + final AtomicBoolean released = new AtomicBoolean(false); + final Runnable releaser = + () -> { + if (released.compareAndSet(false, true)) { + outstandingRequests.decrementAndGet(); + } + }; + final ResponseStream stream = + new ResponseStream(peer, () -> deregisterStream(requestId), releaser); responseStreams.put(requestId, stream); return stream; } @@ -152,13 +162,18 @@ private Response(final boolean closed, final MessageData message) { public static class ResponseStream { private final EthPeer peer; private final DeregistrationProcessor deregisterCallback; + private final Runnable outstandingRequestReleaser; private final Queue bufferedResponses = new ConcurrentLinkedQueue<>(); private volatile boolean closed = false; private volatile ResponseCallback responseCallback = null; - public ResponseStream(final EthPeer peer, final DeregistrationProcessor deregisterCallback) { + public ResponseStream( + final EthPeer peer, + final DeregistrationProcessor deregisterCallback, + final Runnable outstandingRequestReleaser) { this.peer = peer; this.deregisterCallback = deregisterCallback; + this.outstandingRequestReleaser = outstandingRequestReleaser; } public ResponseStream then(final ResponseCallback callback) { @@ -172,12 +187,23 @@ public ResponseStream then(final ResponseCallback callback) { return this; } + /** + * Releases the outstanding request capacity for this stream. Uses CAS to ensure exactly-once + * semantics: either dispatchResponse() or close() will release the capacity, but not both. + */ + void releaseOutstandingRequest() { + outstandingRequestReleaser.run(); + } + public void close() { if (closed) { return; } closed = true; deregisterCallback.exec(); + // Release outstanding request capacity if not already released by dispatchResponse(). + // This handles the case where a request times out without receiving a response. + outstandingRequestReleaser.run(); bufferedResponses.add(new Response(true, null)); dispatchBufferedResponses(); } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java index 611d4a72cd2..5ba50aab9a7 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/EthPeerTest.java @@ -123,7 +123,11 @@ public void shouldHaveAvailableCapacityUntilOutstandingRequestLimitIsReached() assertThat(peer.hasAvailableRequestCapacity()).isFalse(); assertThat(peer.outstandingRequests()).isEqualTo(5); - peer.dispatch(new EthMessage(peer, BlockBodiesMessage.create(emptyList()))); + peer.dispatch( + new EthMessage( + peer, + BlockBodiesMessage.create(emptyList()) + .wrapMessageData(java.math.BigInteger.valueOf(1)))); assertThat(peer.hasAvailableRequestCapacity()).isTrue(); assertThat(peer.outstandingRequests()).isEqualTo(4); } @@ -424,7 +428,7 @@ private void messageStream( // Dispatch unrelated message and check that it is not process EthMessage otherEthMessage = - new EthMessage(peer, otherMessage.wrapMessageData(BigInteger.valueOf(requestIdCounter++))); + new EthMessage(peer, otherMessage.wrapMessageData(BigInteger.valueOf(999))); peer.dispatch(otherEthMessage); assertThat(messageCount.get()).isEqualTo(1); assertThat(closedCount.get()).isEqualTo(0); @@ -433,7 +437,7 @@ private void messageStream( new EthMessage(peer, targetMessage.wrapMessageData(BigInteger.valueOf(requestIdCounter++))); // Dispatch last outstanding message and check that streams are closed peer.dispatch(targetEthMessage); - assertThat(messageCount.get()).isEqualTo(1); + assertThat(messageCount.get()).isEqualTo(2); assertThat(closedCount.get()).isEqualTo(2); targetEthMessage = @@ -441,7 +445,7 @@ private void messageStream( // Check that no new messages are delivered getStream.get(peer); peer.dispatch(targetEthMessage); - assertThat(messageCount.get()).isEqualTo(1); + assertThat(messageCount.get()).isEqualTo(2); assertThat(closedCount.get()).isEqualTo(2); targetEthMessage = diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RequestManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RequestManagerTest.java index 8bb81467bb6..04ec6c00dc6 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RequestManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/RequestManagerTest.java @@ -230,6 +230,20 @@ private EthPeer createPeer() { Bytes.random(64)); } + @Test + public void recordsUselessResponseWhenRequestIdDoesNotMatchOutstandingRequest() throws Exception { + final EthPeer peer = createPeer(); + final RequestManager requestManager = new RequestManager(peer, EthProtocol.NAME); + + // Dispatch responses with request IDs that were never issued - each should record a useless + // response. After USELESS_RESPONSE_THRESHOLD useless responses the peer should be disconnected. + for (int i = 0; i < PeerReputation.USELESS_RESPONSE_THRESHOLD; i++) { + requestManager.dispatchResponse(mockMessage(peer)); + } + + assertThat(peer.isDisconnected()).isTrue(); + } + @Test public void disconnectsPeerOnBadMessage() throws Exception { final EthPeer peer = createPeer(); @@ -251,4 +265,67 @@ public void disconnectsPeerOnBadMessage() throws Exception { requestManager.dispatchResponse(mockMessage); assertThat(peer.isDisconnected()).isTrue(); } + + @Test + public void closingStreamWithoutResponseReleasesOutstandingRequest() throws Exception { + final EthPeer peer = createPeer(); + final RequestManager requestManager = new RequestManager(peer, EthProtocol.NAME); + + final RequestManager.RequestSender sender = __ -> {}; + // Send a request - outstanding should go to 1 + final RequestManager.ResponseStream stream = + requestManager.dispatchRequest(sender, new RawMessage(0x01, Bytes.EMPTY)); + assertThat(requestManager.outstandingRequests()).isEqualTo(1); + + // Close the stream without dispatching a response (simulates timeout) + stream.close(); + + // Outstanding requests should be released back to 0 + assertThat(requestManager.outstandingRequests()).isEqualTo(0); + } + + @Test + public void closingStreamAfterResponseDoesNotDoubleDecrement() throws Exception { + final EthPeer peer = createPeer(); + final RequestManager requestManager = new RequestManager(peer, EthProtocol.NAME); + + final RequestManager.RequestSender sender = __ -> {}; + // Send a request + final RequestManager.ResponseStream stream = + requestManager.dispatchRequest(sender, new RawMessage(0x01, Bytes.EMPTY)); + assertThat(requestManager.outstandingRequests()).isEqualTo(1); + + // Dispatch response - outstanding should go to 0 + final EthMessage mockMessage = mockMessage(peer); + requestManager.dispatchResponse(mockMessage); + assertThat(requestManager.outstandingRequests()).isEqualTo(0); + + // Close the stream after response was received - should not go negative + stream.close(); + assertThat(requestManager.outstandingRequests()).isEqualTo(0); + } + + @Test + public void multipleTimedOutRequestsAllReleaseCapacity() throws Exception { + final EthPeer peer = createPeer(); + final RequestManager requestManager = new RequestManager(peer, EthProtocol.NAME); + + final RequestManager.RequestSender sender = __ -> {}; + // Send 3 requests + final RequestManager.ResponseStream stream1 = + requestManager.dispatchRequest(sender, new RawMessage(0x01, Bytes.EMPTY)); + final RequestManager.ResponseStream stream2 = + requestManager.dispatchRequest(sender, new RawMessage(0x01, Bytes.EMPTY)); + final RequestManager.ResponseStream stream3 = + requestManager.dispatchRequest(sender, new RawMessage(0x01, Bytes.EMPTY)); + assertThat(requestManager.outstandingRequests()).isEqualTo(3); + + // Close all streams without responses (simulates 3 timeouts) + stream1.close(); + assertThat(requestManager.outstandingRequests()).isEqualTo(2); + stream2.close(); + assertThat(requestManager.outstandingRequests()).isEqualTo(1); + stream3.close(); + assertThat(requestManager.outstandingRequests()).isEqualTo(0); + } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetSyncReceiptsFromPeerTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetSyncReceiptsFromPeerTaskTest.java index f8da0991440..2737cc15a67 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetSyncReceiptsFromPeerTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/peertask/task/GetSyncReceiptsFromPeerTaskTest.java @@ -34,7 +34,9 @@ import org.hyperledger.besu.ethereum.core.SyncBlockBody; import org.hyperledger.besu.ethereum.core.SyncTransactionReceipt; import org.hyperledger.besu.ethereum.core.TransactionReceipt; +import org.hyperledger.besu.ethereum.core.encoding.receipt.SyncTransactionReceiptDecoder; import org.hyperledger.besu.ethereum.core.encoding.receipt.SyncTransactionReceiptEncoder; +import org.hyperledger.besu.ethereum.core.encoding.receipt.TransactionReceiptEncoder; import org.hyperledger.besu.ethereum.core.encoding.receipt.TransactionReceiptEncodingConfiguration; import org.hyperledger.besu.ethereum.eth.EthProtocol; import org.hyperledger.besu.ethereum.eth.core.Utils; @@ -424,6 +426,35 @@ public void validateResultFailsReceiptRootDoesNotMatch() { List.of()))); } + @Test + public void validateResultPassesForFrontierReceiptsAfterDecoderRoundTrip() { + // Regression test for eth/69 Frontier receipt root calculation. + // Exercises the full decode path (encodeForRootCalculation is invoked) rather than + // the single-arg constructor path used by toResponseReceipt() in other tests. + final MockedBlock block = mockBlock(1, 2); + + final GetSyncReceiptsFromPeerTask task = + createTask(new Request(List.of(block.block), List.of()), protocolSchedule); + + final SyncTransactionReceiptDecoder decoder = new SyncTransactionReceiptDecoder(); + final List decodedReceipts = + block.receipts.stream() + .map( + receipt -> { + final BytesValueRLPOutput out = new BytesValueRLPOutput(); + TransactionReceiptEncoder.writeTo( + receipt, + out, + TransactionReceiptEncodingConfiguration.ETH69_RECEIPT_CONFIGURATION); + return decoder.decode(out.encoded()); + }) + .toList(); + + assertEquals( + PeerTaskValidationResponse.RESULTS_VALID_AND_GOOD, + task.validateResult(new Response(Map.of(block.block, decodedReceipts), List.of()))); + } + @Test public void validateResultSuccessWhenPartialBlockIsIncomplete() { final MockedBlock mockedBlock = mockBlock(1, 3);