Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@

import java.time.Clock;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -632,12 +631,6 @@ public boolean hasSupportForMessage(final int messageCode) {
.anyMatch(cap -> EthProtocol.get().isValidMessageCode(cap.getVersion(), messageCode));
}

public Capability getMaxAgreedCapability() {
return getAgreedCapabilities().stream()
.max(Comparator.comparingInt(Capability::getVersion))
.orElseThrow(() -> new IllegalStateException("No capabilities available"));
}

@Override
public String toString() {
return String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
import org.hyperledger.besu.ethereum.p2p.rlpx.ConnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.PeerClientName;
Expand Down Expand Up @@ -556,95 +557,80 @@ public String toString() {

private void ethPeerStatusExchanged(final EthPeer peer) {
// We have a connection to a peer that is on the right chain and is willing to connect to us.
// Find out what the EthPeer block height is and whether it can serve snap data (if we are doing
// snap sync)
LOG.debug("Peer {} status exchanged", peer);
assert tracker != null : "ChainHeadTracker must be set before EthPeers can be used";
CompletableFuture<BlockHeader> future = tracker.getBestHeaderFromPeer(peer);

future.whenComplete(
(peerHeadBlockHeader, error) -> {
if (peerHeadBlockHeader == null) {
LOG.debug(
"Failed to retrieve chain head info. Disconnecting {}... {}",
peer.getLoggableId(),
error);
peer.disconnect(
DisconnectMessage.DisconnectReason.USELESS_PEER_FAILED_TO_RETRIEVE_CHAIN_HEAD);
} else {

// we can check trailing peers now
final TrailingPeerRequirements trailingPeerRequirements =
trailingPeerRequirementsSupplier.get();
if (trailingPeerRequirements != null) {
if (peer.chainState().getEstimatedHeight()
< trailingPeerRequirements.getMinimumHeightToBeUpToDate()) {
if (!(getNumTrailingPeers(trailingPeerRequirements.getMinimumHeightToBeUpToDate())
< trailingPeerRequirements.getMaxTrailingPeers())) {
LOG.atTrace()
.setMessage(
"Adding trailing peer {} would exceed max trailing peers {}. Disconnecting...")
.addArgument(peer.getLoggableId())
.addArgument(trailingPeerRequirements.getMaxTrailingPeers())
.log();
peer.disconnect(
DisconnectMessage.DisconnectReason.USELESS_PEER_EXCEEDS_TRAILING_PEERS);
return;
}
}
}

// Handle chain head tracking based on protocol compatibility.
if (EthProtocol.isEth69Compatible(peer.getMaxAgreedCapability())) {
// For Eth69-compatible peers, the chain head information is included in the status message.
// Skip fetching the chain head separately and directly check trailing peer requirements.
checkTrailingPeerRequirements(peer);
} else {
// For non-Eth69-compatible peers, fetch the chain head before checking trailing requirements.
CompletableFuture<BlockHeader> future = tracker.getBestHeaderFromPeer(peer);
future.whenComplete(
(peerHeadBlockHeader, error) -> {
// If we successfully retrieved the chain head, check trailing peer requirements and
// update
// the peer's estimated height.
if (peerHeadBlockHeader != null) {
checkTrailingPeerRequirements(peer);
peer.chainState().updateHeightEstimate(peerHeadBlockHeader.getNumber());
peer.chainState().updateHeightEstimate(peerHeadBlockHeader.getNumber());
CompletableFuture<Void> isServingSnapFuture;
if (syncMode == SyncMode.SNAP || syncMode == SyncMode.CHECKPOINT) {
// even if we have finished the snap sync, we still want to know if the peer is a snap
// server
isServingSnapFuture =
CompletableFuture.runAsync(
() -> {
try {
checkIsSnapServer(peer, peerHeadBlockHeader);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} else {
LOG.debug(
"Failed to retrieve chain head info. Disconnecting {}... {}",
peer.getLoggableId(),
error);
peer.disconnect(
DisconnectMessage.DisconnectReason.USELESS_PEER_FAILED_TO_RETRIEVE_CHAIN_HEAD);
isServingSnapFuture = CompletableFuture.completedFuture(null);
}
});
}

// If we are doing snap sync, check if the peer is a snap server
checkSnapServer(peer);
}

private void checkTrailingPeerRequirements(final EthPeer peer) {
// we can check trailing peers now
final TrailingPeerRequirements trailingPeerRequirements =
trailingPeerRequirementsSupplier.get();
if (trailingPeerRequirements != null) {
if (peer.chainState().getEstimatedHeight()
< trailingPeerRequirements.getMinimumHeightToBeUpToDate()) {
if (!(getNumTrailingPeers(trailingPeerRequirements.getMinimumHeightToBeUpToDate())
< trailingPeerRequirements.getMaxTrailingPeers())) {
LOG.atTrace()
.setMessage(
"Adding trailing peer {} would exceed max trailing peers {}. Disconnecting...")
.addArgument(peer.getLoggableId())
.addArgument(trailingPeerRequirements.getMaxTrailingPeers())
.log();
peer.disconnect(DisconnectMessage.DisconnectReason.USELESS_PEER_EXCEEDS_TRAILING_PEERS);
}
}
}
}

private void checkSnapServer(final EthPeer peer) {
CompletableFuture<Void> isServingSnapFuture;
if (syncMode == SyncMode.SNAP || syncMode == SyncMode.CHECKPOINT) {
// even if we have finished the snap sync, we still want to know if the peer is a snap
// server
isServingSnapFuture =
CompletableFuture.runAsync(
() -> {
try {
checkIsSnapServer(peer);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} else {
isServingSnapFuture = CompletableFuture.completedFuture(null);
}
isServingSnapFuture.thenRun(
() -> {
if (!peer.getConnection().isDisconnected() && addPeerToEthPeers(peer)) {
connectedPeersCounter.inc();
connectCallbacks.forEach(cb -> cb.onPeerConnected(peer));
isServingSnapFuture.thenRun(
() -> {
if (!peer.getConnection().isDisconnected() && addPeerToEthPeers(peer)) {
connectedPeersCounter.inc();
connectCallbacks.forEach(cb -> cb.onPeerConnected(peer));
}
});
}
});
}

private void checkIsSnapServer(final EthPeer peer) {
private void checkIsSnapServer(final EthPeer peer, final BlockHeader peersHeadBlockHeader) {
if (peer.getAgreedCapabilities().contains(SnapProtocol.SNAP1)) {
if (snapServerChecker != null) {
// set that peer is a snap server for doing the test
peer.setIsServingSnap(true);
Boolean isServer;
try {
isServer = snapServerChecker.check(peer).get(6L, TimeUnit.SECONDS);
isServer = snapServerChecker.check(peer, peersHeadBlockHeader).get(6L, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.atTrace()
.setMessage("Error checking if peer {} is a snap server. Setting to false.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ public DefaultSynchronizer(

if (syncConfig.getSyncMode() == SyncMode.SNAP
|| syncConfig.getSyncMode() == SyncMode.CHECKPOINT) {
SnapServerChecker.createAndSetSnapServerChecker(
ethContext, metricsSystem, protocolContext.getBlockchain());
SnapServerChecker.createAndSetSnapServerChecker(ethContext, metricsSystem);
}

this.blockPropagationManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package org.hyperledger.besu.ethereum.eth.sync;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
Expand All @@ -35,29 +34,25 @@ public class SnapServerChecker {

private final EthContext ethContext;
private final MetricsSystem metricsSystem;
private final Blockchain blockchain;

private SnapServerChecker(
final EthContext ethContext, final MetricsSystem metricsSystem, final Blockchain blockchain) {
public SnapServerChecker(final EthContext ethContext, final MetricsSystem metricsSystem) {
this.ethContext = ethContext;
this.metricsSystem = metricsSystem;
this.blockchain = blockchain;
}

public static void createAndSetSnapServerChecker(
final EthContext ethContext, final MetricsSystem metricsSystem, final Blockchain blockchain) {
final SnapServerChecker checker = new SnapServerChecker(ethContext, metricsSystem, blockchain);
final EthContext ethContext, final MetricsSystem metricsSystem) {
final SnapServerChecker checker = new SnapServerChecker(ethContext, metricsSystem);
ethContext.getEthPeers().setSnapServerChecker(checker);
}

public CompletableFuture<Boolean> check(final EthPeer peer) {
public CompletableFuture<Boolean> check(final EthPeer peer, final BlockHeader peersHeadHeader) {
LOG.atTrace()
.setMessage("Checking whether peer {} is a snap server ...")
.addArgument(peer::getLoggableId)
.log();
final CompletableFuture<AbstractPeerTask.PeerTaskResult<AccountRangeMessage.AccountRangeData>>
snapServerCheckCompletableFuture =
getAccountRangeFromPeer(peer, blockchain.getGenesisBlockHeader());
snapServerCheckCompletableFuture = getAccountRangeFromPeer(peer, peersHeadHeader);
final CompletableFuture<Boolean> future = new CompletableFuture<>();
snapServerCheckCompletableFuture.whenComplete(
(peerResult, error) -> {
Expand Down
Loading