Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
### Bug fixes
- BFT forks that change block period on time-based forks don't take effect [9681](https://github.com/hyperledger/besu/issues/9681)
- Fix QBFT `RLPException` when decoding proposals from pre-26.1.0 nodes that do not include the `blockAccessList` field [#9977](https://github.com/hyperledger/besu/pull/9977)
- Wait for peers before starting chain download. Prevents an OutOfMemory (OOM) error when the node has zero peers [#9979](https://github.com/hyperledger/besu/pull/9979)

### Additions and Improvements
- Add IPv6 dual-stack support for DiscV5 peer discovery (enabled via `--Xv5-discovery-enabled`): new `--p2p-host-ipv6`, `--p2p-interface-ipv6`, and `--p2p-port-ipv6` CLI options enable a second UDP discovery socket; `--p2p-ipv6-outbound-enabled` controls whether IPv6 is preferred for outbound connections when a peer advertises both address families [#9763](https://github.com/hyperledger/besu/pull/9763); RLPx now also binds a second TCP socket on the IPv6 interface so IPv6-only peers can establish connections [#9873](https://github.com/hyperledger/besu/pull/9873)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -135,29 +136,42 @@ public CompletableFuture<List<BlockHeader>> apply(final Long startBlockNumber) {
final int currentTaskId = taskId.getAndIncrement();
final List<BlockHeader> downloadedHeaders = new ArrayList<>(headersToRequest);
final AtomicBoolean cancelled = new AtomicBoolean(false);
return ethScheduler
.scheduleServiceTask(
() ->
downloadAllHeaders(
currentTaskId,
0,
startBlockNumber,
headersToRequest,
downloadedHeaders,
cancelled))
.orTimeout(timeoutDuration.toMillis(), TimeUnit.MILLISECONDS)
.whenComplete(
(unused, throwable) -> {
if (throwable instanceof TimeoutException) {
cancelled.set(true);
LOG.trace(
"[{}] Timed out after {} ms while downloading {} backward headers from block {}",
currentTaskId,
timeoutDuration.toMillis(),
headersToRequest,
startBlockNumber);
}
});
final CompletableFuture<List<BlockHeader>> result =
ethScheduler
.scheduleServiceTask(
() ->
downloadAllHeaders(
currentTaskId,
0,
startBlockNumber,
headersToRequest,
downloadedHeaders,
cancelled))
.orTimeout(timeoutDuration.toMillis(), TimeUnit.MILLISECONDS)
.whenComplete(
(unused, throwable) -> {
if (throwable instanceof TimeoutException) {
cancelled.set(true);
LOG.trace(
"[{}] Timed out after {} ms while downloading {} backward headers from block {}",
currentTaskId,
timeoutDuration.toMillis(),
headersToRequest,
startBlockNumber);
}
});
// Stop the retry chain if the result itself is cancelled externally,
// e.g. by AsyncOperationProcessor.abort() when the pipeline is aborted.
// The whenComplete above is attached to the inner P1 future; cancelling
// the returned result (P3) does not complete P1, so we need a separate
// callback on the returned future to catch that case.
result.whenComplete(
(unused, throwable) -> {
if (throwable instanceof CancellationException) {
cancelled.set(true);
}
});
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ public Pipeline<Long> createBackwardHeaderDownloadPipeline(final ChainSyncState

final long pivotBlockNumber = chainState.pivotBlockHeader().getNumber();
LOG.info(
"Creating backward header download pipeline from pivot={} down to lowest block={}, parallelism={}, batchSize={}",
"Creating backward header download pipeline from pivot={} down to lowest block={}, parallelism={}, batchSize={}, peers={}",
pivotBlockNumber,
anchorForHeaderDownload.getNumber(),
downloaderParallelism,
headerRequestSize);
headerRequestSize,
ethContext.getEthPeers().peerCount());

final BackwardBlockNumberSource headerSource =
new BackwardBlockNumberSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class SnapSyncChainDownloader
implements ChainDownloader, PivotUpdateListener, WorldStateHealFinishedListener {
private static final Logger LOG = LoggerFactory.getLogger(SnapSyncChainDownloader.class);
public static final int SMALL_DELAY_MILLISECONDS = 100;
static final int NO_PEER_RETRY_DELAY_MILLISECONDS = 5_000;

private final SnapSyncChainDownloadPipelineFactory pipelineFactory;
private final ProtocolSchedule protocolSchedule;
Expand Down Expand Up @@ -481,6 +482,21 @@ private void attemptDownload(final CompletableFuture<Void> overallResult) {
return;
}

// Guard against starting an expensive 160-concurrent-future pipeline with no peers.
// With no peers every future immediately hits NO_PEER_AVAILABLE, spins for 60 s, and
// the pipeline restarts every 60 s accumulating scheduler/thread overhead indefinitely.
if (ethContext.getEthPeers().peerCount() == 0) {
LOG.debug(
"No peers available, deferring chain sync pipeline start for {} ms",
NO_PEER_RETRY_DELAY_MILLISECONDS);
ethContext
.getScheduler()
.scheduleFutureTask(
() -> attemptDownload(overallResult),
Duration.ofMillis(NO_PEER_RETRY_DELAY_MILLISECONDS));
return;
}

performSingleDownloadCycle()
.whenComplete(
(downloadResult, error) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,41 @@ public void shouldNotRetryAfterTimeout() throws Exception {
}
}

@Test
public void shouldNotRetryAfterCancellation() throws Exception {
// The first request fails transiently, scheduling a retry after RETRY_DELAY (1 s).
// The pipeline cancels the future (as AsyncOperationProcessor.abort() does) before
// the retry fires. The cancelled flag must be set so the retry exits immediately
// without making a second peer request.
final EthScheduler realScheduler = new EthScheduler(1, 1, 1, new NoOpMetricsSystem());
final EthContext realEthContext = mock(EthContext.class);
when(realEthContext.getScheduler()).thenReturn(realScheduler);
when(realEthContext.getPeerTaskExecutor()).thenReturn(peerTaskExecutor);

try {
final DownloadBackwardHeadersStep step =
new DownloadBackwardHeadersStep(
protocolSchedule, realEthContext, 10, 0, Duration.ofMinutes(1));

when(peerTaskExecutor.execute(any(GetHeadersFromPeerTask.class)))
.thenReturn(
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE, emptyList()));

final CompletableFuture<List<BlockHeader>> result = step.apply(100L);

// Simulate pipeline abort cancelling the future before the 1-second retry fires.
result.cancel(true);
assertThat(result.isCancelled()).isTrue();

// The retry is scheduled 1 s after the first failure; verify it does not run.
verify(peerTaskExecutor, after(1200).times(1)).execute(any(GetHeadersFromPeerTask.class));
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion is timing-sensitive and may be flaky/ineffective under slow or contended CI: if a buggy retry is delayed beyond 1200ms, the test can falsely pass. Consider using a controllable/fake scheduler, or increase the verification window significantly (while still bounding test runtime), so the test reliably detects an unintended second execution.

Copilot uses AI. Check for mistakes.
} finally {
realScheduler.stop();
realScheduler.awaitStop();
}
}

private List<BlockHeader> createMockHeaders(final int count, final long startBlock) {
final BlockHeaderFunctions bhf = new LocalBlockHeaderFunctions();
final List<BlockHeader> headers = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.common.ChainSyncState;
import org.hyperledger.besu.ethereum.eth.sync.common.ChainSyncStateStorage;
Expand All @@ -34,7 +40,9 @@
import org.hyperledger.besu.metrics.SyncDurationMetrics;

import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -50,6 +58,7 @@ public class SnapSyncChainDownloaderTest {
@Mock private ProtocolSchedule protocolSchedule;
@Mock private ProtocolContext protocolContext;
@Mock private EthContext ethContext;
@Mock private EthPeers ethPeers;
@Mock private SyncState syncState;
@Mock private SyncDurationMetrics syncDurationMetrics;
@Mock private MutableBlockchain blockchain;
Expand All @@ -72,6 +81,7 @@ public void setUp() {
lenient().when(blockchain.getChainHeadBlockNumber()).thenReturn(500L);
lenient().when(blockchain.getChainHeadHeader()).thenReturn(checkpointBlockHeader);
lenient().when(ethContext.getScheduler()).thenReturn(scheduler);
lenient().when(ethContext.getEthPeers()).thenReturn(ethPeers);
lenient()
.when(blockchain.getGenesisBlockHeader())
.thenReturn(new BlockHeaderTestFixture().number(0).buildHeader());
Expand Down Expand Up @@ -152,6 +162,36 @@ public void shouldReceiveWorldStateHealFinished() {
assertThatCode(downloader::onWorldStateHealFinished).doesNotThrowAnyException();
}

@Test
public void shouldDeferPipelineStartWhenNoPeersAvailable() {
when(ethPeers.peerCount()).thenReturn(0);
when(scheduler.scheduleFutureTask(any(Runnable.class), any(Duration.class)))
.thenReturn(CompletableFuture.completedFuture(null));

SnapSyncChainDownloader downloader =
new SnapSyncChainDownloader(
pipelineFactory,
protocolSchedule,
protocolContext,
ethContext,
syncState,
syncDurationMetrics,
pivotBlockHeader,
chainSyncStateStorage,
headerDownloader);

downloader.start();

// No pipeline should be created when there are no peers
verify(pipelineFactory, never()).createBackwardHeaderDownloadPipeline(any());

// A retry should be scheduled with the no-peer delay, not the fast retry delay
verify(scheduler)
.scheduleFutureTask(
any(Runnable.class),
eq(Duration.ofMillis(SnapSyncChainDownloader.NO_PEER_RETRY_DELAY_MILLISECONDS)));
}

@Test
public void shouldHandleStateTransitionFromInitialToHeadersComplete() {
// Create and store initial state
Expand Down
Loading