Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6bc8b15
test: combine snap fixes and enable Besu-to-Besu snap sync test
macfarla Feb 25, 2026
9a7d9e2
allow pivot distance to be customized for test
macfarla Feb 26, 2026
64b4d39
request tracking bug
macfarla Feb 26, 2026
9ab2b92
Frontier receipts were never detected as Frontier
macfarla Feb 26, 2026
8c06212
Merge branch 'main' of github.com:hyperledger/besu into test/snap-bes…
macfarla Feb 26, 2026
8a96d09
changelog entry
macfarla Feb 26, 2026
5a5cff1
revert changes from #9855 and #9877 to keep pr minimal
macfarla Feb 26, 2026
817371c
pr number in changelog
macfarla Feb 26, 2026
e93597b
fixed EthPeerTests
macfarla Feb 26, 2026
682283e
reduce timeout for test
macfarla Feb 26, 2026
cce004a
simplify
macfarla Feb 26, 2026
c52f7b7
Merge branch 'main' of github.com:hyperledger/besu into test/snap-bes…
macfarla Feb 26, 2026
38968ca
Revert "simplify"
macfarla Feb 27, 2026
d938b95
fix: use getEthSerializedType in SyncTransactionReceiptDecoder for Fr…
macfarla Feb 27, 2026
2f9e9c9
Merge branch 'main' into test/snap-besu-to-besu
macfarla Feb 27, 2026
4564de5
merge
macfarla Mar 10, 2026
11af979
Merge branch 'test/snap-besu-to-besu' of github.com:macfarla/besu int…
macfarla Mar 10, 2026
d1c9ce8
Merge branch 'main' of github.com:hyperledger/besu into test/snap-bes…
macfarla Mar 12, 2026
66155de
logging improvements for breach of protocol for rlp
macfarla Mar 12, 2026
760d30e
Add regression test for eth/69 Frontier receipt trie root mismatch
macfarla Mar 13, 2026
c18929f
Fix typed receipt encoding in putSyncTransactionReceipts
macfarla Mar 13, 2026
63d1971
reduce comments
macfarla Mar 17, 2026
b4947af
Merge branch 'main' of github.com:hyperledger/besu into test/snap-bes…
macfarla Mar 17, 2026
c9a33d5
Merge branch 'main' into test/snap-besu-to-besu
macfarla Mar 17, 2026
57864b1
Merge branch 'main' of github.com:hyperledger/besu into test/snap-bes…
macfarla Mar 18, 2026
51e2d6d
review comments
macfarla Mar 18, 2026
b12815d
merge and review comments
macfarla Mar 18, 2026
0433166
Merge branch 'test/snap-besu-to-besu' of github.com:macfarla/besu int…
macfarla Mar 18, 2026
47e09f5
Merge branch 'main' into test/snap-besu-to-besu
macfarla Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
- `PluginTransactionSelectorFactory.create(final SelectorsStateManager selectorsStateManager)` is deprecated for removal

### Bug fixes
- Fix eth/69 snap sync receipt root mismatch by correctly identifying Frontier transaction type in SyncTransactionReceiptEncoder [#9900](https://github.com/hyperledger/besu/pull/9900)
- Fix outstanding request counter leak in RequestManager that could cause peers to appear at capacity [#9900](https://github.com/hyperledger/besu/pull/9900)
- BFT forks that change block period on time-based forks don't take effect [9681](https://github.com/hyperledger/besu/issues/9681)
- Fix QBFT `RLPException` when decoding proposals from pre-26.1.0 nodes that do not include the `blockAccessList` field [#9977](https://github.com/hyperledger/besu/pull/9977)
- Fix eth_simulateV1 discrepancy [9960] (https://github.com/besu-eth/besu/issues/9960) eth_simulateV1 now accepts calls where both input and data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ private List<String> commandlineArgs(final BesuNode node, final Path dataDir) {
}
params.add("--sync-min-peers");
params.add(Integer.toString(node.getSynchronizerConfiguration().getSyncMinimumPeerCount()));
params.add("--Xsynchronizer-pivot-distance");
params.add(Integer.toString(node.getSynchronizerConfiguration().getSyncPivotDistance()));
} else {
params.add("--sync-mode");
params.add("FULL");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright contributors to Besu.
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -19,6 +19,8 @@

import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.ImmutableSnapSyncConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncConfiguration;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;
import org.hyperledger.besu.tests.acceptance.dsl.AcceptanceTestBase;
import org.hyperledger.besu.tests.acceptance.dsl.node.BesuNode;
Expand Down Expand Up @@ -89,6 +91,9 @@ public static void runBesuCommand(final Path dataPath) throws IOException, Inter
@Test
public void blocksAreBuiltAndNodesSyncAfterSwitchingToPoS() throws Exception {

final SnapSyncConfiguration snapServerEnabledConfig =
ImmutableSnapSyncConfiguration.builder().isSnapServerEnabled(true).build();

final BesuNode minerNode =
besu.createNode(
"miner",
Expand All @@ -103,6 +108,13 @@ public void blocksAreBuiltAndNodesSyncAfterSwitchingToPoS() throws Exception {
.engineRpcEnabled(true)
.miningEnabled());

minerNode.setSynchronizerConfiguration(
SynchronizerConfiguration.builder()
.syncMode(SyncMode.FULL)
.syncMinimumPeerCount(1)
.snapSyncConfiguration(snapServerEnabledConfig)
.build());

// First sync node uses full sync and starts fresh; it does not produce blocks
final BesuNode syncNodeFull =
besu.createNode(
Expand All @@ -119,6 +131,7 @@ public void blocksAreBuiltAndNodesSyncAfterSwitchingToPoS() throws Exception {
SynchronizerConfiguration.builder()
.syncMode(SyncMode.FULL)
.syncMinimumPeerCount(1)
.snapSyncConfiguration(snapServerEnabledConfig)
.build())
.engineRpcEnabled(true));

Expand All @@ -140,6 +153,8 @@ public void blocksAreBuiltAndNodesSyncAfterSwitchingToPoS() throws Exception {
SynchronizerConfiguration.builder()
.syncMode(SyncMode.SNAP)
.syncMinimumPeerCount(1)
.syncPivotDistance(2)
.snapSyncConfiguration(snapServerEnabledConfig)
.build());

// Copy key files to the miner node datadir
Expand All @@ -161,36 +176,32 @@ public void blocksAreBuiltAndNodesSyncAfterSwitchingToPoS() throws Exception {
}
minerNode.verify(blockchain.currentHeight(10));

final String block10Hash = latestPayload.get("blockHash").asText();
final String headHash = latestPayload.get("blockHash").asText();

// Add the full sync node to the cluster so it peers with minerNode
cluster.addNode(syncNodeFull);

// ensure the node reached TTD first
syncNodeFull.verify(blockchain.minimumHeight(4, 10));
syncNodeFull.verify(blockchain.minimumHeight(4, 30));

// A single forkchoiceUpdatedV1 pointing at block 10 kicks off backward sync;
// A single forkchoiceUpdatedV1 pointing at head kicks off backward sync;
// syncNodeFull does not have the block yet so it responds SYNCING and begins downloading
triggerSyncViaForkchoiceUpdate(syncNodeFull, block10Hash);
triggerSyncViaForkchoiceUpdate(syncNodeFull, headHash);

// Wait for full sync to complete and verify the full chain is present
syncNodeFull.verify(blockchain.minimumHeight(10, 30));

/* TODO: Uncomment when snap sync is fixed to work for small networks

// Add the snap sync node to the cluster so it peers with minerNode
cluster.addNode(syncNodeSnap);

syncNodeSnap.awaitPeerDiscovery(net.awaitPeerCount(2));

// A single forkchoiceUpdatedV1 pointing at block 10 kicks off snap sync to pivot;
// A single forkchoiceUpdatedV1 pointing at head kicks off snap sync to pivot;
// syncNodeSnap does not have the block yet so it responds SYNCING and begins downloading
triggerSyncViaForkchoiceUpdate(syncNodeSnap, block10Hash);
triggerSyncViaForkchoiceUpdate(syncNodeSnap, headHash);

// Wait for snap sync to complete and verify the full chain is present
syncNodeSnap.verify(blockchain.minimumHeight(10, 60));

*/
syncNodeSnap.verify(blockchain.minimumHeight(10, 120));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private SyncTransactionReceipt decodeEth69Receipt(
final Bytes statusOrStateRoot) {
Bytes transactionTypeCode =
transactionByteRlp.isEmpty()
? Bytes.of(TransactionType.FRONTIER.getSerializedType())
? Bytes.of(TransactionType.FRONTIER.getEthSerializedType())
: transactionByteRlp;
Bytes cumulativeGasUsed = input.readBytes();
List<List<Bytes>> logs = parseLogs(input);
Expand All @@ -117,7 +117,7 @@ private SyncTransactionReceipt decodeLegacyReceipt(
if (bloomFilter != null) {
syncTransactionReceipt = new SyncTransactionReceipt(rawRlp);
} else {
Bytes transactionTypeCode = Bytes.of(TransactionType.FRONTIER.getSerializedType());
Bytes transactionTypeCode = Bytes.of(TransactionType.FRONTIER.getEthSerializedType());
List<List<Bytes>> logs = parseLogs(input);
syncTransactionReceipt =
new SyncTransactionReceipt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ public SyncTransactionReceiptEncoder(final SimpleNoCopyRlpEncoder rlpEncoder) {
}

public Bytes encodeForRootCalculation(final SyncTransactionReceipt receipt) {
final Bytes typeCode = receipt.getTransactionTypeCode();
final boolean isFrontier =
!receipt.getTransactionTypeCode().isEmpty()
&& receipt.getTransactionTypeCode().get(0)
== TransactionType.FRONTIER.getSerializedType();
typeCode.isEmpty()
|| typeCode.get(0) == TransactionType.FRONTIER.getEthSerializedType()
|| typeCode.get(0) == TransactionType.FRONTIER.getSerializedType();

List<Bytes> encodedLogs =
receipt.getLogs().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -67,14 +68,15 @@ public ResponseStream dispatchRequest(final RequestSender sender, final MessageD
}

public void dispatchResponse(final EthMessage ethMessage) {
final Collection<ResponseStream> streams = List.copyOf(responseStreams.values());
final int count = outstandingRequests.decrementAndGet();
try {
final Map.Entry<BigInteger, MessageData> requestIdAndEthMessage =
ethMessage.getData().unwrapMessageData();
Optional.ofNullable(responseStreams.get(requestIdAndEthMessage.getKey()))
.ifPresentOrElse(
responseStream -> responseStream.processMessage(requestIdAndEthMessage.getValue()),
responseStream -> {
responseStream.releaseOutstandingRequest();
responseStream.processMessage(requestIdAndEthMessage.getValue());
},
// Consider incorrect requestIds to be a useless response; too
// many of these and we will disconnect.
() -> peer.recordUselessResponse("Request ID incorrect"));
Expand All @@ -90,9 +92,9 @@ public void dispatchResponse(final EthMessage ethMessage) {
DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL_MALFORMED_MESSAGE_RECEIVED);
}

if (count == 0) {
if (outstandingRequests.get() == 0) {
// No possibility of any remaining outstanding messages
closeOutstandingStreams(streams);
closeOutstandingStreams(List.copyOf(responseStreams.values()));
}
}

Expand All @@ -101,7 +103,15 @@ public void close() {
}

private ResponseStream createStream(final BigInteger requestId) {
final ResponseStream stream = new ResponseStream(peer, () -> deregisterStream(requestId));
final AtomicBoolean released = new AtomicBoolean(false);
final Runnable releaser =
() -> {
if (released.compareAndSet(false, true)) {
outstandingRequests.decrementAndGet();
}
};
final ResponseStream stream =
new ResponseStream(peer, () -> deregisterStream(requestId), releaser);
responseStreams.put(requestId, stream);
return stream;
}
Expand Down Expand Up @@ -152,13 +162,18 @@ private Response(final boolean closed, final MessageData message) {
public static class ResponseStream {
private final EthPeer peer;
private final DeregistrationProcessor deregisterCallback;
private final Runnable outstandingRequestReleaser;
private final Queue<Response> bufferedResponses = new ConcurrentLinkedQueue<>();
private volatile boolean closed = false;
private volatile ResponseCallback responseCallback = null;

public ResponseStream(final EthPeer peer, final DeregistrationProcessor deregisterCallback) {
public ResponseStream(
final EthPeer peer,
final DeregistrationProcessor deregisterCallback,
final Runnable outstandingRequestReleaser) {
this.peer = peer;
this.deregisterCallback = deregisterCallback;
this.outstandingRequestReleaser = outstandingRequestReleaser;
}

public ResponseStream then(final ResponseCallback callback) {
Expand All @@ -172,12 +187,23 @@ public ResponseStream then(final ResponseCallback callback) {
return this;
}

/**
* Releases the outstanding request capacity for this stream. Uses CAS to ensure exactly-once
* semantics: either dispatchResponse() or close() will release the capacity, but not both.
*/
void releaseOutstandingRequest() {
outstandingRequestReleaser.run();
}

public void close() {
if (closed) {
return;
}
closed = true;
deregisterCallback.exec();
// Release outstanding request capacity if not already released by dispatchResponse().
// This handles the case where a request times out without receiving a response.
outstandingRequestReleaser.run();
bufferedResponses.add(new Response(true, null));
dispatchBufferedResponses();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ public void shouldHaveAvailableCapacityUntilOutstandingRequestLimitIsReached()
assertThat(peer.hasAvailableRequestCapacity()).isFalse();
assertThat(peer.outstandingRequests()).isEqualTo(5);

peer.dispatch(new EthMessage(peer, BlockBodiesMessage.create(emptyList())));
peer.dispatch(
new EthMessage(
peer,
BlockBodiesMessage.create(emptyList())
.wrapMessageData(java.math.BigInteger.valueOf(1))));
assertThat(peer.hasAvailableRequestCapacity()).isTrue();
assertThat(peer.outstandingRequests()).isEqualTo(4);
}
Expand Down Expand Up @@ -424,7 +428,7 @@ private void messageStream(

// Dispatch unrelated message and check that it is not process
EthMessage otherEthMessage =
new EthMessage(peer, otherMessage.wrapMessageData(BigInteger.valueOf(requestIdCounter++)));
new EthMessage(peer, otherMessage.wrapMessageData(BigInteger.valueOf(999)));
peer.dispatch(otherEthMessage);
assertThat(messageCount.get()).isEqualTo(1);
assertThat(closedCount.get()).isEqualTo(0);
Expand All @@ -433,15 +437,15 @@ private void messageStream(
new EthMessage(peer, targetMessage.wrapMessageData(BigInteger.valueOf(requestIdCounter++)));
// Dispatch last outstanding message and check that streams are closed
peer.dispatch(targetEthMessage);
assertThat(messageCount.get()).isEqualTo(1);
assertThat(messageCount.get()).isEqualTo(2);
assertThat(closedCount.get()).isEqualTo(2);

targetEthMessage =
new EthMessage(peer, targetMessage.wrapMessageData(BigInteger.valueOf(requestIdCounter++)));
// Check that no new messages are delivered
getStream.get(peer);
peer.dispatch(targetEthMessage);
assertThat(messageCount.get()).isEqualTo(1);
assertThat(messageCount.get()).isEqualTo(2);
assertThat(closedCount.get()).isEqualTo(2);

targetEthMessage =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,20 @@ private EthPeer createPeer() {
Bytes.random(64));
}

@Test
public void recordsUselessResponseWhenRequestIdDoesNotMatchOutstandingRequest() throws Exception {
final EthPeer peer = createPeer();
final RequestManager requestManager = new RequestManager(peer, EthProtocol.NAME);

// Dispatch responses with request IDs that were never issued - each should record a useless
// response. After USELESS_RESPONSE_THRESHOLD useless responses the peer should be disconnected.
for (int i = 0; i < PeerReputation.USELESS_RESPONSE_THRESHOLD; i++) {
requestManager.dispatchResponse(mockMessage(peer));
}

assertThat(peer.isDisconnected()).isTrue();
}

@Test
public void disconnectsPeerOnBadMessage() throws Exception {
final EthPeer peer = createPeer();
Expand All @@ -251,4 +265,67 @@ public void disconnectsPeerOnBadMessage() throws Exception {
requestManager.dispatchResponse(mockMessage);
assertThat(peer.isDisconnected()).isTrue();
}

@Test
public void closingStreamWithoutResponseReleasesOutstandingRequest() throws Exception {
final EthPeer peer = createPeer();
final RequestManager requestManager = new RequestManager(peer, EthProtocol.NAME);

final RequestManager.RequestSender sender = __ -> {};
// Send a request - outstanding should go to 1
final RequestManager.ResponseStream stream =
requestManager.dispatchRequest(sender, new RawMessage(0x01, Bytes.EMPTY));
assertThat(requestManager.outstandingRequests()).isEqualTo(1);

// Close the stream without dispatching a response (simulates timeout)
stream.close();

// Outstanding requests should be released back to 0
assertThat(requestManager.outstandingRequests()).isEqualTo(0);
}

@Test
public void closingStreamAfterResponseDoesNotDoubleDecrement() throws Exception {
final EthPeer peer = createPeer();
final RequestManager requestManager = new RequestManager(peer, EthProtocol.NAME);

final RequestManager.RequestSender sender = __ -> {};
// Send a request
final RequestManager.ResponseStream stream =
requestManager.dispatchRequest(sender, new RawMessage(0x01, Bytes.EMPTY));
assertThat(requestManager.outstandingRequests()).isEqualTo(1);

// Dispatch response - outstanding should go to 0
final EthMessage mockMessage = mockMessage(peer);
requestManager.dispatchResponse(mockMessage);
assertThat(requestManager.outstandingRequests()).isEqualTo(0);

// Close the stream after response was received - should not go negative
stream.close();
assertThat(requestManager.outstandingRequests()).isEqualTo(0);
}

@Test
public void multipleTimedOutRequestsAllReleaseCapacity() throws Exception {
final EthPeer peer = createPeer();
final RequestManager requestManager = new RequestManager(peer, EthProtocol.NAME);

final RequestManager.RequestSender sender = __ -> {};
// Send 3 requests
final RequestManager.ResponseStream stream1 =
requestManager.dispatchRequest(sender, new RawMessage(0x01, Bytes.EMPTY));
final RequestManager.ResponseStream stream2 =
requestManager.dispatchRequest(sender, new RawMessage(0x01, Bytes.EMPTY));
final RequestManager.ResponseStream stream3 =
requestManager.dispatchRequest(sender, new RawMessage(0x01, Bytes.EMPTY));
assertThat(requestManager.outstandingRequests()).isEqualTo(3);

// Close all streams without responses (simulates 3 timeouts)
stream1.close();
assertThat(requestManager.outstandingRequests()).isEqualTo(2);
stream2.close();
assertThat(requestManager.outstandingRequests()).isEqualTo(1);
stream3.close();
assertThat(requestManager.outstandingRequests()).isEqualTo(0);
}
}
Loading
Loading