Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/src/main/java/org/hyperledger/besu/RunnerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ public Runner build() {
.ethPeersShouldConnect(ethPeers::shouldTryToConnect)
.build();

ethPeers.setRlpxAgent(networkRunner.getRlpxAgent());
networkRunner.getRlpxAgent().ifPresent(ethPeers::setRlpxAgent);

final P2PNetwork network = networkRunner.getNetwork();
// ForkId in Ethereum Node Record needs updating when we transition to a new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.PeerClientName;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.PeerInfo;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
Expand Down Expand Up @@ -423,7 +424,7 @@ public Stream<PeerConnection> streamAllConnections() {
.filter(c -> !c.isDisconnected());
}

public boolean shouldTryToConnect(final Peer peer, final boolean inbound) {
public Optional<DisconnectReason> shouldTryToConnect(final Peer peer, final boolean inbound) {

if (peer.getForkId().isPresent()) {
final ForkId forkId = peer.getForkId().get();
Expand All @@ -433,7 +434,7 @@ public boolean shouldTryToConnect(final Peer peer, final boolean inbound) {
.addArgument(peer::getId)
.log();

return false;
return Optional.of(DisconnectReason.USELESS_PEER_BY_CHAIN_COMPARATOR);
}
}

Expand All @@ -443,10 +444,14 @@ public boolean shouldTryToConnect(final Peer peer, final boolean inbound) {
.setMessage("not connecting to peer {} - already connected")
.addArgument(peer.getLoggableId())
.log();
return false;
return Optional.of(DisconnectReason.ALREADY_CONNECTED);
}

return peerCount() < getMaxPeers() || needMoreSnapServers() || canExceedPeerLimits(id);
if (peerCount() < getMaxPeers() || needMoreSnapServers() || canExceedPeerLimits(id)) {
return Optional.empty();
} else {
return Optional.of(DisconnectReason.TOO_MANY_PEERS);
}
}

private boolean alreadyConnectedOrConnecting(final boolean inbound, final Bytes id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.hyperledger.besu.ethereum.p2p.peers.DefaultPeer;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissions;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
Expand Down Expand Up @@ -221,7 +220,7 @@ public TestNode(
NetworkRunner.builder()
.subProtocols(EthProtocol.get())
.protocolManagers(singletonList(ethProtocolManager))
.ethPeersShouldConnect((p, d) -> true)
.ethPeersShouldConnect((p, d) -> Optional.empty())
.network(
capabilities ->
createP2PNetwork(
Expand All @@ -234,9 +233,7 @@ public TestNode(
.metricsSystem(new NoOpMetricsSystem())
.build();
network = networkRunner.getNetwork();
final RlpxAgent rlpxAgent = network.getRlpxAgent();
rlpxAgent.subscribeConnectRequest((p, d) -> true);
ethPeers.setRlpxAgent(rlpxAgent);
network.getRlpxAgent().ifPresent(ethPeers::setRlpxAgent);
network.subscribeDisconnect(
(connection, reason, initiatedByPeer) -> disconnections.put(connection, reason));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.ConnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.DisconnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.MessageCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.DefaultMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.PeerInfo;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.ShouldConnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.plugin.data.EnodeURL;
import org.hyperledger.besu.util.Subscribers;
Expand Down Expand Up @@ -174,9 +174,6 @@ public void subscribeConnect(final ConnectCallback callback) {
connectCallbacks.subscribe(callback);
}

@Override
public void subscribeConnectRequest(final ShouldConnectCallback callback) {}

@Override
public void subscribeDisconnect(final DisconnectCallback callback) {
disconnectCallbacks.subscribe(callback);
Expand Down Expand Up @@ -236,6 +233,11 @@ public Optional<EnodeURL> getLocalEnode() {

@Override
public void updateNodeRecord() {}

@Override
public Optional<RlpxAgent> getRlpxAgent() {
return Optional.empty();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerLookup;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.ShouldConnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.nat.NatMethod;
import org.hyperledger.besu.nat.NatService;
Expand Down Expand Up @@ -315,8 +314,8 @@ public void awaitStop() {
}

@Override
public RlpxAgent getRlpxAgent() {
return rlpxAgent;
public Optional<RlpxAgent> getRlpxAgent() {
return Optional.of(rlpxAgent);
}

@Override
Expand Down Expand Up @@ -418,11 +417,6 @@ public void subscribeConnect(final ConnectCallback callback) {
rlpxAgent.subscribeConnect(callback);
}

@Override
public void subscribeConnectRequest(final ShouldConnectCallback callback) {
rlpxAgent.subscribeConnectRequest(callback);
}

@Override
public void subscribeDisconnect(final DisconnectCallback callback) {
rlpxAgent.subscribeDisconnect(callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
Expand All @@ -50,18 +51,15 @@ public class NetworkRunner implements AutoCloseable {
private final List<ProtocolManager> protocolManagers;
private final LabelledMetric<Counter> inboundMessageCounter;
private final LabelledMetric<Counter> inboundBytesCounter;
private final BiFunction<Peer, Boolean, Boolean> ethPeersShouldConnect;

private NetworkRunner(
final P2PNetwork network,
final Map<String, SubProtocol> subProtocols,
final List<ProtocolManager> protocolManagers,
final MetricsSystem metricsSystem,
final BiFunction<Peer, Boolean, Boolean> ethPeersShouldConnect) {
final MetricsSystem metricsSystem) {
this.network = network;
this.protocolManagers = protocolManagers;
this.subProtocols = subProtocols;
this.ethPeersShouldConnect = ethPeersShouldConnect;
this.inboundMessageCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.NETWORK,
Expand Down Expand Up @@ -178,8 +176,6 @@ private void setupHandlers() {
protocolManager.handleNewConnection(connection);
});

network.subscribeConnectRequest(ethPeersShouldConnect::apply);

network.subscribeDisconnect(
(connection, disconnectReason, initiatedByPeer) -> {
if (Collections.disjoint(
Expand All @@ -196,7 +192,7 @@ public void close() {
stop();
}

public RlpxAgent getRlpxAgent() {
public Optional<RlpxAgent> getRlpxAgent() {
return network.getRlpxAgent();
}

Expand All @@ -205,7 +201,7 @@ public static class Builder {
List<ProtocolManager> protocolManagers = new ArrayList<>();
List<SubProtocol> subProtocols = new ArrayList<>();
MetricsSystem metricsSystem;
private BiFunction<Peer, Boolean, Boolean> ethPeersShouldConnect;
private BiFunction<Peer, Boolean, Optional<DisconnectReason>> ethPeersShouldConnect;

public NetworkRunner build() {
final Map<String, SubProtocol> subProtocolMap = new HashMap<>();
Expand All @@ -223,8 +219,10 @@ public NetworkRunner build() {
}
}
final P2PNetwork network = networkProvider.build(caps);
return new NetworkRunner(
network, subProtocolMap, protocolManagers, metricsSystem, ethPeersShouldConnect);
network
.getRlpxAgent()
.ifPresent(agent -> agent.setShouldConnectCallback(ethPeersShouldConnect::apply));
return new NetworkRunner(network, subProtocolMap, protocolManagers, metricsSystem);
}

public Builder protocolManagers(final List<ProtocolManager> protocolManagers) {
Expand Down Expand Up @@ -252,7 +250,8 @@ public Builder metricsSystem(final MetricsSystem metricsSystem) {
return this;
}

public Builder ethPeersShouldConnect(final BiFunction<Peer, Boolean, Boolean> shouldConnect) {
public Builder ethPeersShouldConnect(
final BiFunction<Peer, Boolean, Optional<DisconnectReason>> shouldConnect) {
this.ethPeersShouldConnect = shouldConnect;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.ConnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.DisconnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.MessageCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.ShouldConnectCallback;
import org.hyperledger.besu.plugin.data.EnodeURL;

import java.io.IOException;
Expand Down Expand Up @@ -55,9 +55,6 @@ public void subscribe(final Capability capability, final MessageCallback callbac
@Override
public void subscribeConnect(final ConnectCallback callback) {}

@Override
public void subscribeConnectRequest(final ShouldConnectCallback callback) {}

@Override
public void subscribeDisconnect(final DisconnectCallback callback) {}

Expand Down Expand Up @@ -115,4 +112,9 @@ public void close() throws IOException {}

@Override
public void start() {}

@Override
public Optional<RlpxAgent> getRlpxAgent() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.ShouldConnectCallback;
import org.hyperledger.besu.plugin.data.EnodeURL;

import java.io.Closeable;
Expand Down Expand Up @@ -87,8 +86,6 @@ default int getPeerCount() {
*/
void subscribeConnect(final ConnectCallback callback);

void subscribeConnectRequest(final ShouldConnectCallback callback);

/**
* Subscribe a {@link Consumer} to all incoming new Peer disconnect events.
*
Expand Down Expand Up @@ -164,7 +161,5 @@ default int getPeerCount() {

void updateNodeRecord();

default RlpxAgent getRlpxAgent() {
return null;
}
Optional<RlpxAgent> getRlpxAgent();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -65,7 +64,7 @@ public class RlpxAgent {
private final PeerConnectionEvents connectionEvents;
private final ConnectionInitializer connectionInitializer;
private final Subscribers<ConnectCallback> connectSubscribers = Subscribers.create();
private final List<ShouldConnectCallback> connectRequestSubscribers = new ArrayList<>();
private ShouldConnectCallback shouldConnectCallback;
private final PeerRlpxPermissions peerPermissions;
private final PeerPrivileges peerPrivileges;
private final AtomicBoolean started = new AtomicBoolean(false);
Expand Down Expand Up @@ -233,7 +232,9 @@ public CompletableFuture<PeerConnection> connect(final Peer peer) {
}

final CompletableFuture<PeerConnection> peerConnectionCompletableFuture;
if (checkWhetherToConnect(peer, false)) {
final Optional<DisconnectReason> maybeDisconnectReason = checkWhetherToConnect(peer, false);

if (maybeDisconnectReason.isEmpty()) {
try {
synchronized (this) {
peerConnectionCompletableFuture =
Expand Down Expand Up @@ -266,9 +267,11 @@ private CompletableFuture<PeerConnection> createPeerConnectionCompletableFuture(
return peerConnectionCompletableFuture;
}

private boolean checkWhetherToConnect(final Peer peer, final boolean incoming) {
return connectRequestSubscribers.stream()
.anyMatch(callback -> callback.shouldConnect(peer, incoming));
private Optional<DisconnectReason> checkWhetherToConnect(
final Peer peer, final boolean incoming) {
return shouldConnectCallback != null
? shouldConnectCallback.shouldConnect(peer, incoming)
: Optional.empty();
}

private void setupListeners() {
Expand Down Expand Up @@ -344,10 +347,11 @@ private void handleIncomingConnection(final PeerConnection peerConnection) {
return;
}

if (checkWhetherToConnect(peer, true)) {
final Optional<DisconnectReason> maybeDisconnectReason = checkWhetherToConnect(peer, true);
if (maybeDisconnectReason.isEmpty()) {
dispatchConnect(peerConnection);
} else {
peerConnection.disconnect(DisconnectReason.UNKNOWN);
peerConnection.disconnect(maybeDisconnectReason.get());
}
}

Expand All @@ -359,8 +363,8 @@ public void subscribeConnect(final ConnectCallback callback) {
connectSubscribers.subscribe(callback);
}

public void subscribeConnectRequest(final ShouldConnectCallback callback) {
connectRequestSubscribers.add(callback);
public void setShouldConnectCallback(final ShouldConnectCallback callback) {
this.shouldConnectCallback = callback;
}

public void subscribeDisconnect(final DisconnectCallback callback) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
package org.hyperledger.besu.ethereum.p2p.rlpx.wire;

import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;

import java.util.Optional;

@FunctionalInterface
public interface ShouldConnectCallback {

boolean shouldConnect(final Peer peer, final boolean incoming);
Optional<DisconnectMessage.DisconnectReason> shouldConnect(
final Peer peer, final boolean incoming);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;

import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;

import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -84,8 +86,10 @@ public void setUp() {

// Setup network mocks to allow start() to complete
when(network.isListening()).thenReturn(true);
when(network.getRlpxAgent()).thenReturn(Optional.empty());

BiFunction<Peer, Boolean, Boolean> ethPeersShouldConnect = (peer, incoming) -> true;
BiFunction<Peer, Boolean, Optional<DisconnectMessage.DisconnectReason>> ethPeersShouldConnect =
(peer, incoming) -> Optional.empty();

NetworkRunner.NetworkBuilder networkBuilder = caps -> network;

Expand Down
Loading