Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.tasks.CompleteBlocksTask;
import org.hyperledger.besu.ethereum.eth.sync.tasks.CompleteBlocksWithPeerTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -31,39 +29,23 @@ public class DownloadBodiesStep

private final ProtocolSchedule protocolSchedule;
private final EthContext ethContext;
private final MetricsSystem metricsSystem;
private final SynchronizerConfiguration synchronizerConfiguration;

public DownloadBodiesStep(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final SynchronizerConfiguration synchronizerConfiguration,
final MetricsSystem metricsSystem) {
public DownloadBodiesStep(final ProtocolSchedule protocolSchedule, final EthContext ethContext) {
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.synchronizerConfiguration = synchronizerConfiguration;
this.metricsSystem = metricsSystem;
}

@Override
public CompletableFuture<List<Block>> apply(final List<BlockHeader> blockHeaders) {
if (synchronizerConfiguration.isPeerTaskSystemEnabled()) {
return ethContext
.getScheduler()
.scheduleServiceTask(() -> getBodiesWithPeerTaskSystem(blockHeaders));
} else {
return CompleteBlocksTask.forHeaders(
protocolSchedule, ethContext, blockHeaders, metricsSystem)
.run();
}
}

private CompletableFuture<List<Block>> getBodiesWithPeerTaskSystem(
final List<BlockHeader> headers) {

final CompleteBlocksWithPeerTask completeBlocksWithPeerTask =
new CompleteBlocksWithPeerTask(protocolSchedule, headers, ethContext.getPeerTaskExecutor());
final List<Block> blocks = completeBlocksWithPeerTask.retrieveBlocksFromPeers();
return CompletableFuture.completedFuture(blocks);
return ethContext
.getScheduler()
.scheduleServiceTask(
() -> {
final CompleteBlocksWithPeerTask completeBlocksWithPeerTask =
new CompleteBlocksWithPeerTask(
protocolSchedule, blockHeaders, ethContext.getPeerTaskExecutor());
final List<Block> blocks = completeBlocksWithPeerTask.retrieveBlocksFromPeers();
return CompletableFuture.completedFuture(blocks);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public Pipeline<?> createDownloadPipelineForSyncTarget(
metricsSystem);
final RangeHeadersValidationStep validateHeadersJoinUpStep = new RangeHeadersValidationStep();
final DownloadBodiesStep downloadBodiesStep =
new DownloadBodiesStep(protocolSchedule, ethContext, syncConfig, metricsSystem);
new DownloadBodiesStep(protocolSchedule, ethContext);
final ExtractTxSignaturesStep extractTxSignaturesStep = new ExtractTxSignaturesStep();
final FullImportBlockStep importBlockStep =
new FullImportBlockStep(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package org.hyperledger.besu.ethereum.eth.sync.fullsync;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.Blockchain;
Expand All @@ -29,6 +28,8 @@
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTaskExecutorAnswer;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
Expand All @@ -43,6 +44,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class FullSyncChainDownloaderForkTest {

Expand All @@ -51,6 +53,7 @@ public class FullSyncChainDownloaderForkTest {
protected EthContext ethContext;
protected ProtocolContext protocolContext;
private SyncState syncState;
private PeerTaskExecutor peerTaskExecutor;

private BlockchainSetupUtil localBlockchainSetup;
protected MutableBlockchain localBlockchain;
Expand All @@ -67,6 +70,7 @@ public void setupTest() throws IOException {

protocolSchedule = localBlockchainSetup.getProtocolSchedule();
protocolContext = localBlockchainSetup.getProtocolContext();
peerTaskExecutor = Mockito.mock(PeerTaskExecutor.class);
ethProtocolManager =
EthProtocolManagerTestBuilder.builder()
.setProtocolSchedule(protocolSchedule)
Expand All @@ -75,9 +79,14 @@ public void setupTest() throws IOException {
.setWorldStateArchive(localBlockchainSetup.getWorldArchive())
.setTransactionPool(localBlockchainSetup.getTransactionPool())
.setEthereumWireProtocolConfiguration(EthProtocolConfiguration.DEFAULT)
.setPeerTaskExecutor(peerTaskExecutor)
.build();
ethContext = ethProtocolManager.ethContext();
syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers());

Mockito.when(peerTaskExecutor.execute(Mockito.any(GetBodiesFromPeerTask.class)))
.thenAnswer(
new GetBodiesFromPeerTaskExecutorAnswer(otherBlockchain, ethContext.getEthPeers()));
}

@AfterEach
Expand All @@ -95,7 +104,7 @@ private ChainDownloader downloader(final SynchronizerConfiguration syncConfig) {
metricsSystem,
SyncTerminationCondition.never(),
SyncDurationMetrics.NO_OP_SYNC_DURATION_METRICS,
mock(PeerTaskExecutor.class));
peerTaskExecutor);
}

private ChainDownloader downloader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.metrics.SyncDurationMetrics;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
Expand Down Expand Up @@ -363,73 +362,6 @@ public void choosesBestPeerAsSyncTarget_byTdAndHeight(final DataStorageFormat st
assertThat(syncState.syncTarget().get().peer()).isEqualTo(peerB.getEthPeer());
}

@ParameterizedTest
@ArgumentsSource(FullSyncChainDownloaderTestArguments.class)
public void recoversFromSyncTargetDisconnect(final DataStorageFormat storageFormat) {
setupTest(storageFormat);
localBlockchainSetup.importFirstBlocks(2);
final long localChainHeadAtStart = localBlockchain.getChainHeadBlockNumber();
otherBlockchainSetup.importAllBlocks();
final long targetBlock = otherBlockchain.getChainHeadBlockNumber();
// Sanity check
assertThat(targetBlock).isGreaterThan(localBlockchain.getChainHeadBlockNumber());

final SynchronizerConfiguration syncConfig =
syncConfigBuilder().downloaderChainSegmentSize(5).downloaderHeadersRequestSize(3).build();
final ChainDownloader downloader = downloader(syncConfig);

final long bestPeerChainHead = otherBlockchain.getChainHeadBlockNumber();
final RespondingEthPeer bestPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, otherBlockchain);
final long secondBestPeerChainHead = bestPeerChainHead - 3;
final Blockchain shorterChain = createShortChain(otherBlockchain, secondBestPeerChainHead);
final RespondingEthPeer secondBestPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, shorterChain);
final RespondingEthPeer.Responder bestResponder =
RespondingEthPeer.blockchainResponder(otherBlockchain);
final RespondingEthPeer.Responder secondBestResponder =
RespondingEthPeer.blockchainResponder(shorterChain);
downloader.start();

// Process through sync target selection
bestPeer.respondWhileOtherThreadsWork(bestResponder, () -> !syncState.syncTarget().isPresent());

assertThat(syncState.syncTarget()).isPresent();
assertThat(syncState.syncTarget().get().peer()).isEqualTo(bestPeer.getEthPeer());

// The next message should be for checkpoint headers from the sync target
Awaitility.waitAtMost(10, TimeUnit.SECONDS)
.until(() -> bestPeer.peekNextOutgoingRequest().isPresent());

// Process through the first import
await()
.atMost(10, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
if (!bestPeer.respond(bestResponder)) {
secondBestPeer.respond(secondBestResponder);
}
assertThat(localBlockchain.getChainHeadBlockNumber())
.isNotEqualTo(localChainHeadAtStart);
});

// Sanity check that we haven't already passed the second best peer
assertThat(localBlockchain.getChainHeadBlockNumber()).isLessThan(secondBestPeerChainHead);

// Disconnect peer
ethProtocolManager.handleDisconnect(
bestPeer.getPeerConnection(), DisconnectReason.TOO_MANY_PEERS, true);

// Downloader should recover and sync to next best peer, but it may stall
// for 10 seconds first (by design).
secondBestPeer.respondWhileOtherThreadsWork(
secondBestResponder,
() -> localBlockchain.getChainHeadBlockNumber() != secondBestPeerChainHead);

assertThat(localBlockchain.getChainHeadBlockNumber()).isEqualTo(secondBestPeerChainHead);
}

@ParameterizedTest
@ArgumentsSource(FullSyncChainDownloaderTestArguments.class)
public void requestsCheckpointsFromSyncTarget(final DataStorageFormat storageFormat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTaskExecutorAnswer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetHeadersFromPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetHeadersFromPeerTaskExecutorAnswer;
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
Expand Down Expand Up @@ -109,6 +111,10 @@ public void setupTest(final DataStorageFormat storageFormat) {
peerTaskExecutor.executeAgainstPeer(
Mockito.any(GetHeadersFromPeerTask.class), Mockito.any(EthPeer.class)))
.thenAnswer(headersAnswer);

Mockito.when(peerTaskExecutor.execute(Mockito.any(GetBodiesFromPeerTask.class)))
.thenAnswer(
new GetBodiesFromPeerTaskExecutorAnswer(otherBlockchain, ethContext.getEthPeers()));
}

@AfterEach
Expand Down
Loading