diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java index cf6aa9d0476..6fc3eaf9f64 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java @@ -42,7 +42,6 @@ import java.time.Clock; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -632,12 +631,6 @@ public boolean hasSupportForMessage(final int messageCode) { .anyMatch(cap -> EthProtocol.get().isValidMessageCode(cap.getVersion(), messageCode)); } - public Capability getMaxAgreedCapability() { - return getAgreedCapabilities().stream() - .max(Comparator.comparingInt(Capability::getVersion)) - .orElseThrow(() -> new IllegalStateException("No capabilities available")); - } - @Override public String toString() { return String.format( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index ba8c623020d..b856f2cc2f6 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -29,6 +29,7 @@ import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.p2p.peers.Peer; import org.hyperledger.besu.ethereum.p2p.peers.PeerId; +import org.hyperledger.besu.ethereum.p2p.rlpx.ConnectCallback; import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent; import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.PeerClientName; @@ -556,95 +557,80 @@ public String toString() { private void ethPeerStatusExchanged(final EthPeer peer) { // We have a connection to a peer that is on the right chain and is willing to connect to us. + // Find out what the EthPeer block height is and whether it can serve snap data (if we are doing + // snap sync) LOG.debug("Peer {} status exchanged", peer); assert tracker != null : "ChainHeadTracker must be set before EthPeers can be used"; + CompletableFuture future = tracker.getBestHeaderFromPeer(peer); + + future.whenComplete( + (peerHeadBlockHeader, error) -> { + if (peerHeadBlockHeader == null) { + LOG.debug( + "Failed to retrieve chain head info. Disconnecting {}... {}", + peer.getLoggableId(), + error); + peer.disconnect( + DisconnectMessage.DisconnectReason.USELESS_PEER_FAILED_TO_RETRIEVE_CHAIN_HEAD); + } else { + + // we can check trailing peers now + final TrailingPeerRequirements trailingPeerRequirements = + trailingPeerRequirementsSupplier.get(); + if (trailingPeerRequirements != null) { + if (peer.chainState().getEstimatedHeight() + < trailingPeerRequirements.getMinimumHeightToBeUpToDate()) { + if (!(getNumTrailingPeers(trailingPeerRequirements.getMinimumHeightToBeUpToDate()) + < trailingPeerRequirements.getMaxTrailingPeers())) { + LOG.atTrace() + .setMessage( + "Adding trailing peer {} would exceed max trailing peers {}. Disconnecting...") + .addArgument(peer.getLoggableId()) + .addArgument(trailingPeerRequirements.getMaxTrailingPeers()) + .log(); + peer.disconnect( + DisconnectMessage.DisconnectReason.USELESS_PEER_EXCEEDS_TRAILING_PEERS); + return; + } + } + } - // Handle chain head tracking based on protocol compatibility. - if (EthProtocol.isEth69Compatible(peer.getMaxAgreedCapability())) { - // For Eth69-compatible peers, the chain head information is included in the status message. - // Skip fetching the chain head separately and directly check trailing peer requirements. - checkTrailingPeerRequirements(peer); - } else { - // For non-Eth69-compatible peers, fetch the chain head before checking trailing requirements. - CompletableFuture future = tracker.getBestHeaderFromPeer(peer); - future.whenComplete( - (peerHeadBlockHeader, error) -> { - // If we successfully retrieved the chain head, check trailing peer requirements and - // update - // the peer's estimated height. - if (peerHeadBlockHeader != null) { - checkTrailingPeerRequirements(peer); - peer.chainState().updateHeightEstimate(peerHeadBlockHeader.getNumber()); + peer.chainState().updateHeightEstimate(peerHeadBlockHeader.getNumber()); + CompletableFuture isServingSnapFuture; + if (syncMode == SyncMode.SNAP || syncMode == SyncMode.CHECKPOINT) { + // even if we have finished the snap sync, we still want to know if the peer is a snap + // server + isServingSnapFuture = + CompletableFuture.runAsync( + () -> { + try { + checkIsSnapServer(peer, peerHeadBlockHeader); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } else { - LOG.debug( - "Failed to retrieve chain head info. Disconnecting {}... {}", - peer.getLoggableId(), - error); - peer.disconnect( - DisconnectMessage.DisconnectReason.USELESS_PEER_FAILED_TO_RETRIEVE_CHAIN_HEAD); + isServingSnapFuture = CompletableFuture.completedFuture(null); } - }); - } - - // If we are doing snap sync, check if the peer is a snap server - checkSnapServer(peer); - } - - private void checkTrailingPeerRequirements(final EthPeer peer) { - // we can check trailing peers now - final TrailingPeerRequirements trailingPeerRequirements = - trailingPeerRequirementsSupplier.get(); - if (trailingPeerRequirements != null) { - if (peer.chainState().getEstimatedHeight() - < trailingPeerRequirements.getMinimumHeightToBeUpToDate()) { - if (!(getNumTrailingPeers(trailingPeerRequirements.getMinimumHeightToBeUpToDate()) - < trailingPeerRequirements.getMaxTrailingPeers())) { - LOG.atTrace() - .setMessage( - "Adding trailing peer {} would exceed max trailing peers {}. Disconnecting...") - .addArgument(peer.getLoggableId()) - .addArgument(trailingPeerRequirements.getMaxTrailingPeers()) - .log(); - peer.disconnect(DisconnectMessage.DisconnectReason.USELESS_PEER_EXCEEDS_TRAILING_PEERS); - } - } - } - } - - private void checkSnapServer(final EthPeer peer) { - CompletableFuture isServingSnapFuture; - if (syncMode == SyncMode.SNAP || syncMode == SyncMode.CHECKPOINT) { - // even if we have finished the snap sync, we still want to know if the peer is a snap - // server - isServingSnapFuture = - CompletableFuture.runAsync( - () -> { - try { - checkIsSnapServer(peer); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } else { - isServingSnapFuture = CompletableFuture.completedFuture(null); - } - isServingSnapFuture.thenRun( - () -> { - if (!peer.getConnection().isDisconnected() && addPeerToEthPeers(peer)) { - connectedPeersCounter.inc(); - connectCallbacks.forEach(cb -> cb.onPeerConnected(peer)); + isServingSnapFuture.thenRun( + () -> { + if (!peer.getConnection().isDisconnected() && addPeerToEthPeers(peer)) { + connectedPeersCounter.inc(); + connectCallbacks.forEach(cb -> cb.onPeerConnected(peer)); + } + }); } }); } - private void checkIsSnapServer(final EthPeer peer) { + private void checkIsSnapServer(final EthPeer peer, final BlockHeader peersHeadBlockHeader) { if (peer.getAgreedCapabilities().contains(SnapProtocol.SNAP1)) { if (snapServerChecker != null) { // set that peer is a snap server for doing the test peer.setIsServingSnap(true); Boolean isServer; try { - isServer = snapServerChecker.check(peer).get(6L, TimeUnit.SECONDS); + isServer = snapServerChecker.check(peer, peersHeadBlockHeader).get(6L, TimeUnit.SECONDS); } catch (Exception e) { LOG.atTrace() .setMessage("Error checking if peer {} is a snap server. Setting to false.") diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index 969e58afc29..4dc6d0060b2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -106,8 +106,7 @@ public DefaultSynchronizer( if (syncConfig.getSyncMode() == SyncMode.SNAP || syncConfig.getSyncMode() == SyncMode.CHECKPOINT) { - SnapServerChecker.createAndSetSnapServerChecker( - ethContext, metricsSystem, protocolContext.getBlockchain()); + SnapServerChecker.createAndSetSnapServerChecker(ethContext, metricsSystem); } this.blockPropagationManager = diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SnapServerChecker.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SnapServerChecker.java index 0b51323ad9f..c7fa141837b 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SnapServerChecker.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SnapServerChecker.java @@ -15,7 +15,6 @@ package org.hyperledger.besu.ethereum.eth.sync; import org.hyperledger.besu.datatypes.Hash; -import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; @@ -35,29 +34,25 @@ public class SnapServerChecker { private final EthContext ethContext; private final MetricsSystem metricsSystem; - private final Blockchain blockchain; - private SnapServerChecker( - final EthContext ethContext, final MetricsSystem metricsSystem, final Blockchain blockchain) { + public SnapServerChecker(final EthContext ethContext, final MetricsSystem metricsSystem) { this.ethContext = ethContext; this.metricsSystem = metricsSystem; - this.blockchain = blockchain; } public static void createAndSetSnapServerChecker( - final EthContext ethContext, final MetricsSystem metricsSystem, final Blockchain blockchain) { - final SnapServerChecker checker = new SnapServerChecker(ethContext, metricsSystem, blockchain); + final EthContext ethContext, final MetricsSystem metricsSystem) { + final SnapServerChecker checker = new SnapServerChecker(ethContext, metricsSystem); ethContext.getEthPeers().setSnapServerChecker(checker); } - public CompletableFuture check(final EthPeer peer) { + public CompletableFuture check(final EthPeer peer, final BlockHeader peersHeadHeader) { LOG.atTrace() .setMessage("Checking whether peer {} is a snap server ...") .addArgument(peer::getLoggableId) .log(); final CompletableFuture> - snapServerCheckCompletableFuture = - getAccountRangeFromPeer(peer, blockchain.getGenesisBlockHeader()); + snapServerCheckCompletableFuture = getAccountRangeFromPeer(peer, peersHeadHeader); final CompletableFuture future = new CompletableFuture<>(); snapServerCheckCompletableFuture.whenComplete( (peerResult, error) -> {