Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/src/main/java/org/hyperledger/besu/RunnerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -816,10 +816,10 @@ public Runner build() {
.subProtocols(subProtocols)
.network(p2pEnabled ? activeNetwork : inactiveNetwork)
.metricsSystem(metricsSystem)
.ethPeersShouldConnect(ethPeers::shouldTryToConnect)
.peerConnectionGatekeeper(ethPeers::gatePeerConnection)
.build();

ethPeers.setRlpxAgent(networkRunner.getRlpxAgent());
networkRunner.getRlpxAgent().ifPresent(ethPeers::setRlpxAgent);

final P2PNetwork network = networkRunner.getNetwork();
// ForkId in Ethereum Node Record needs updating when we transition to a new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.PeerClientName;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.PeerInfo;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
Expand Down Expand Up @@ -423,7 +424,7 @@ public Stream<PeerConnection> streamAllConnections() {
.filter(c -> !c.isDisconnected());
}

public boolean shouldTryToConnect(final Peer peer, final boolean inbound) {
public Optional<DisconnectReason> gatePeerConnection(final Peer peer, final boolean inbound) {

if (peer.getForkId().isPresent()) {
final ForkId forkId = peer.getForkId().get();
Expand All @@ -433,7 +434,7 @@ public boolean shouldTryToConnect(final Peer peer, final boolean inbound) {
.addArgument(peer::getId)
.log();

return false;
return Optional.of(DisconnectReason.USELESS_PEER_BY_CHAIN_COMPARATOR);
}
}

Expand All @@ -443,10 +444,14 @@ public boolean shouldTryToConnect(final Peer peer, final boolean inbound) {
.setMessage("not connecting to peer {} - already connected")
.addArgument(peer.getLoggableId())
.log();
return false;
return Optional.of(DisconnectReason.ALREADY_CONNECTED);
}

return peerCount() < getMaxPeers() || needMoreSnapServers() || canExceedPeerLimits(id);
if (peerCount() < getMaxPeers() || needMoreSnapServers() || canExceedPeerLimits(id)) {
return Optional.empty();
} else {
return Optional.of(DisconnectReason.TOO_MANY_PEERS);
}
}

private boolean alreadyConnectedOrConnecting(final boolean inbound, final Bytes id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.hyperledger.besu.ethereum.p2p.peers.DefaultPeer;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissions;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
Expand Down Expand Up @@ -221,7 +220,7 @@ public TestNode(
NetworkRunner.builder()
.subProtocols(EthProtocol.get())
.protocolManagers(singletonList(ethProtocolManager))
.ethPeersShouldConnect((p, d) -> true)
.peerConnectionGatekeeper((p, d) -> Optional.empty())
.network(
capabilities ->
createP2PNetwork(
Expand All @@ -234,9 +233,7 @@ public TestNode(
.metricsSystem(new NoOpMetricsSystem())
.build();
network = networkRunner.getNetwork();
final RlpxAgent rlpxAgent = network.getRlpxAgent();
rlpxAgent.subscribeConnectRequest((p, d) -> true);
ethPeers.setRlpxAgent(rlpxAgent);
network.getRlpxAgent().ifPresent(ethPeers::setRlpxAgent);
network.subscribeDisconnect(
(connection, reason, initiatedByPeer) -> disconnections.put(connection, reason));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.ConnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.DisconnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.MessageCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.DefaultMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.PeerInfo;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.ShouldConnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.util.Subscribers;

Expand Down Expand Up @@ -174,9 +174,6 @@ public void subscribeConnect(final ConnectCallback callback) {
connectCallbacks.subscribe(callback);
}

@Override
public void subscribeConnectRequest(final ShouldConnectCallback callback) {}

@Override
public void subscribeDisconnect(final DisconnectCallback callback) {
disconnectCallbacks.subscribe(callback);
Expand Down Expand Up @@ -236,6 +233,11 @@ public Optional<EnodeURLImpl> getLocalEnode() {

@Override
public void updateNodeRecord() {}

@Override
public Optional<RlpxAgent> getRlpxAgent() {
return Optional.empty();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerLookup;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.ShouldConnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.nat.NatMethod;
import org.hyperledger.besu.nat.NatService;
Expand Down Expand Up @@ -337,8 +336,8 @@ public void awaitStop() {
}

@Override
public RlpxAgent getRlpxAgent() {
return rlpxAgent;
public Optional<RlpxAgent> getRlpxAgent() {
return Optional.of(rlpxAgent);
}

@Override
Expand Down Expand Up @@ -440,11 +439,6 @@ public void subscribeConnect(final ConnectCallback callback) {
rlpxAgent.subscribeConnect(callback);
}

@Override
public void subscribeConnectRequest(final ShouldConnectCallback callback) {
rlpxAgent.subscribeConnectRequest(callback);
}

@Override
public void subscribeDisconnect(final DisconnectCallback callback) {
rlpxAgent.subscribeDisconnect(callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
Expand All @@ -50,18 +51,15 @@ public class NetworkRunner implements AutoCloseable {
private final List<ProtocolManager> protocolManagers;
private final LabelledMetric<Counter> inboundMessageCounter;
private final LabelledMetric<Counter> inboundBytesCounter;
private final BiFunction<Peer, Boolean, Boolean> ethPeersShouldConnect;

private NetworkRunner(
final P2PNetwork network,
final Map<String, SubProtocol> subProtocols,
final List<ProtocolManager> protocolManagers,
final MetricsSystem metricsSystem,
final BiFunction<Peer, Boolean, Boolean> ethPeersShouldConnect) {
final MetricsSystem metricsSystem) {
this.network = network;
this.protocolManagers = protocolManagers;
this.subProtocols = subProtocols;
this.ethPeersShouldConnect = ethPeersShouldConnect;
this.inboundMessageCounter =
metricsSystem.createLabelledCounter(
BesuMetricCategory.NETWORK,
Expand Down Expand Up @@ -178,8 +176,6 @@ private void setupHandlers() {
protocolManager.handleNewConnection(connection);
});

network.subscribeConnectRequest(ethPeersShouldConnect::apply);

network.subscribeDisconnect(
(connection, disconnectReason, initiatedByPeer) -> {
if (Collections.disjoint(
Expand All @@ -196,7 +192,7 @@ public void close() {
stop();
}

public RlpxAgent getRlpxAgent() {
public Optional<RlpxAgent> getRlpxAgent() {
return network.getRlpxAgent();
}

Expand All @@ -205,7 +201,8 @@ public static class Builder {
List<ProtocolManager> protocolManagers = new ArrayList<>();
List<SubProtocol> subProtocols = new ArrayList<>();
MetricsSystem metricsSystem;
private BiFunction<Peer, Boolean, Boolean> ethPeersShouldConnect;
private BiFunction<Peer, Boolean, Optional<DisconnectReason>> peerConnectionGatekeeper =
(peer, incoming) -> Optional.empty();

public NetworkRunner build() {
final Map<String, SubProtocol> subProtocolMap = new HashMap<>();
Expand All @@ -223,8 +220,10 @@ public NetworkRunner build() {
}
}
final P2PNetwork network = networkProvider.build(caps);
return new NetworkRunner(
network, subProtocolMap, protocolManagers, metricsSystem, ethPeersShouldConnect);
network
.getRlpxAgent()
.ifPresent(agent -> agent.setPeerConnectionGatekeeper(peerConnectionGatekeeper::apply));
return new NetworkRunner(network, subProtocolMap, protocolManagers, metricsSystem);
}

public Builder protocolManagers(final List<ProtocolManager> protocolManagers) {
Expand Down Expand Up @@ -252,8 +251,9 @@ public Builder metricsSystem(final MetricsSystem metricsSystem) {
return this;
}

public Builder ethPeersShouldConnect(final BiFunction<Peer, Boolean, Boolean> shouldConnect) {
this.ethPeersShouldConnect = shouldConnect;
public Builder peerConnectionGatekeeper(
final BiFunction<Peer, Boolean, Optional<DisconnectReason>> gatekeeper) {
this.peerConnectionGatekeeper = gatekeeper;
return this;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.ConnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.DisconnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.MessageCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.RlpxAgent;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.ShouldConnectCallback;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -55,9 +55,6 @@ public void subscribe(final Capability capability, final MessageCallback callbac
@Override
public void subscribeConnect(final ConnectCallback callback) {}

@Override
public void subscribeConnectRequest(final ShouldConnectCallback callback) {}

@Override
public void subscribeDisconnect(final DisconnectCallback callback) {}

Expand Down Expand Up @@ -115,4 +112,9 @@ public void close() throws IOException {}

@Override
public void start() {}

@Override
public Optional<RlpxAgent> getRlpxAgent() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.ShouldConnectCallback;

import java.io.Closeable;
import java.util.Collection;
Expand Down Expand Up @@ -87,8 +86,6 @@ default int getPeerCount() {
*/
void subscribeConnect(final ConnectCallback callback);

void subscribeConnectRequest(final ShouldConnectCallback callback);

/**
* Subscribe a {@link Consumer} to all incoming new Peer disconnect events.
*
Expand Down Expand Up @@ -173,7 +170,5 @@ default Optional<String> getLocalEnr() {
return Optional.empty();
}

default RlpxAgent getRlpxAgent() {
return null;
}
Optional<RlpxAgent> getRlpxAgent();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerRlpxPermissions;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty.NettyConnectionInitializer;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.ShouldConnectCallback;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.PeerConnectionGatekeeper;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.plugin.data.EnodeURL;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.Subscribers;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -65,7 +64,7 @@ public class RlpxAgent {
private final PeerConnectionEvents connectionEvents;
private final ConnectionInitializer connectionInitializer;
private final Subscribers<ConnectCallback> connectSubscribers = Subscribers.create();
private final List<ShouldConnectCallback> connectRequestSubscribers = new ArrayList<>();
private volatile PeerConnectionGatekeeper peerConnectionGatekeeper;
private final PeerRlpxPermissions peerPermissions;
private final PeerPrivileges peerPrivileges;
private final AtomicBoolean started = new AtomicBoolean(false);
Expand Down Expand Up @@ -233,7 +232,9 @@ public CompletableFuture<PeerConnection> connect(final Peer peer) {
}

final CompletableFuture<PeerConnection> peerConnectionCompletableFuture;
if (checkWhetherToConnect(peer, false)) {
final Optional<DisconnectReason> maybeDisconnectReason = gatePeerConnection(peer, false);

if (maybeDisconnectReason.isEmpty()) {
try {
synchronized (this) {
peerConnectionCompletableFuture =
Expand Down Expand Up @@ -266,9 +267,10 @@ private CompletableFuture<PeerConnection> createPeerConnectionCompletableFuture(
return peerConnectionCompletableFuture;
}

private boolean checkWhetherToConnect(final Peer peer, final boolean incoming) {
return connectRequestSubscribers.stream()
.anyMatch(callback -> callback.shouldConnect(peer, incoming));
private Optional<DisconnectReason> gatePeerConnection(final Peer peer, final boolean incoming) {
return peerConnectionGatekeeper != null
? peerConnectionGatekeeper.checkPeerConnection(peer, incoming)
: Optional.empty();
}

private void setupListeners() {
Expand Down Expand Up @@ -344,10 +346,11 @@ private void handleIncomingConnection(final PeerConnection peerConnection) {
return;
}

if (checkWhetherToConnect(peer, true)) {
final Optional<DisconnectReason> maybeDisconnectReason = gatePeerConnection(peer, true);
if (maybeDisconnectReason.isEmpty()) {
dispatchConnect(peerConnection);
} else {
peerConnection.disconnect(DisconnectReason.UNKNOWN);
peerConnection.disconnect(maybeDisconnectReason.get());
}
}

Expand All @@ -359,8 +362,8 @@ public void subscribeConnect(final ConnectCallback callback) {
connectSubscribers.subscribe(callback);
}

public void subscribeConnectRequest(final ShouldConnectCallback callback) {
connectRequestSubscribers.add(callback);
public void setPeerConnectionGatekeeper(final PeerConnectionGatekeeper gatekeeper) {
this.peerConnectionGatekeeper = gatekeeper;
}

public void subscribeDisconnect(final DisconnectCallback callback) {
Expand Down
Loading