Skip to content

Commit c581f2c

Browse files
committed
Add byte-level metrics for P2P message exchange
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net> # Conflicts: # CHANGELOG.md
1 parent 82ce653 commit c581f2c

File tree

8 files changed

+404
-3
lines changed

8 files changed

+404
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
- Add ability to pass a custom tracer to block simulation [#9708](https://github.com/hyperledger/besu/pull/9708)
4040
- Add support for `4byteTracer` in `debug_trace*` methods to collect function selectors from internal calls via PR [#9642](https://github.com/hyperledger/besu/pull/9642). Thanks to [@JukLee0ira](https://github.com/JukLee0ira).
4141
- Update assertj to v3.27.7 [#9710](https://github.com/hyperledger/besu/pull/9710)
42+
- Add byte-level metrics for P2P message exchange [#9666](https://github.com/hyperledger/besu/pull/9666)
4243

4344
### Bug fixes
4445
- Fix callTracer handling of failed CREATE operations, including correct input field extraction and proper error reporting for both soft failures and revert reasons
@@ -76,7 +77,6 @@
7677
- Add `engine_getBlobsV3` method [#9582](https://github.com/hyperledger/besu/pull/9582)
7778
- Verify plugins on start [#9601](https://github.com/hyperledger/besu/pull/9601)
7879
- Add EIP-7778 to Amsterdam [#9664](https://github.com/hyperledger/besu/pull/9664)
79-
- Treat EndOfRLPException as invalid packet in peer discovery, meaning you will no longer see these exceptions, unless you enable DEBUG logs [#9597](https://github.com/hyperledger/besu/pull/9597)
8080

8181
### Bug fixes
8282
- Fix promotion to prioritized layer for gas price fee markets [#9635](https://github.com/hyperledger/besu/pull/9635)

ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/NetworkRunner.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class NetworkRunner implements AutoCloseable {
4949
private final Map<String, SubProtocol> subProtocols;
5050
private final List<ProtocolManager> protocolManagers;
5151
private final LabelledMetric<Counter> inboundMessageCounter;
52+
private final LabelledMetric<Counter> inboundBytesCounter;
5253
private final BiFunction<Peer, Boolean, Boolean> ethPeersShouldConnect;
5354

5455
private NetworkRunner(
@@ -61,14 +62,22 @@ private NetworkRunner(
6162
this.protocolManagers = protocolManagers;
6263
this.subProtocols = subProtocols;
6364
this.ethPeersShouldConnect = ethPeersShouldConnect;
64-
inboundMessageCounter =
65+
this.inboundMessageCounter =
6566
metricsSystem.createLabelledCounter(
6667
BesuMetricCategory.NETWORK,
6768
"p2p_messages_inbound",
6869
"Count of each P2P message received inbound.",
6970
"protocol",
7071
"name",
7172
"code");
73+
this.inboundBytesCounter =
74+
metricsSystem.createLabelledCounter(
75+
BesuMetricCategory.NETWORK,
76+
"p2p_bytes_inbound",
77+
"Count of bytes received inbound.",
78+
"protocol",
79+
"name",
80+
"code");
7281
}
7382

7483
public P2PNetwork getNetwork() {
@@ -147,6 +156,12 @@ private void setupHandlers() {
147156
protocol.messageName(cap.getVersion(), code),
148157
Integer.toString(code))
149158
.inc();
159+
inboundBytesCounter
160+
.labels(
161+
cap.toString(),
162+
protocol.messageName(cap.getVersion(), code),
163+
Integer.toString(code))
164+
.inc(message.getData().getSize());
150165
protocolManager.processMessage(cap, message);
151166
});
152167
}

ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/AbstractPeerConnection.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public abstract class AbstractPeerConnection implements PeerConnection {
5454
private final AtomicBoolean terminatedImmediately = new AtomicBoolean(false);
5555
protected final PeerConnectionEventDispatcher connectionEventDispatcher;
5656
private final LabelledMetric<Counter> outboundMessagesCounter;
57+
private final LabelledMetric<Counter> outboundBytesCounter;
5758
private final long initiatedAt;
5859
private final boolean inboundInitiated;
5960
private boolean statusSent;
@@ -68,6 +69,7 @@ protected AbstractPeerConnection(
6869
final CapabilityMultiplexer multiplexer,
6970
final PeerConnectionEventDispatcher connectionEventDispatcher,
7071
final LabelledMetric<Counter> outboundMessagesCounter,
72+
final LabelledMetric<Counter> outboundBytesCounter,
7173
final boolean inboundInitiated) {
7274
this.peer = peer;
7375
this.peerInfo = peerInfo;
@@ -82,6 +84,7 @@ protected AbstractPeerConnection(
8284
}
8385
this.connectionEventDispatcher = connectionEventDispatcher;
8486
this.outboundMessagesCounter = outboundMessagesCounter;
87+
this.outboundBytesCounter = outboundBytesCounter;
8588
this.inboundInitiated = inboundInitiated;
8689
this.initiatedAt = System.currentTimeMillis();
8790

@@ -118,13 +121,26 @@ private void doSend(final Capability capability, final MessageData message) {
118121
subProtocol.messageName(capability.getVersion(), message.getCode()),
119122
Integer.toString(message.getCode()))
120123
.inc();
124+
outboundBytesCounter
125+
.labels(
126+
capability.toString(),
127+
subProtocol.messageName(capability.getVersion(), message.getCode()),
128+
Integer.toString(message.getCode()))
129+
.inc(message.getSize());
130+
121131
} else {
122132
outboundMessagesCounter
123133
.labels(
124134
"Wire",
125135
WireMessageCodes.messageName(message.getCode()),
126136
Integer.toString(message.getCode()))
127137
.inc();
138+
outboundBytesCounter
139+
.labels(
140+
"Wire",
141+
WireMessageCodes.messageName(message.getCode()),
142+
Integer.toString(message.getCode()))
143+
.inc(message.getSize());
128144
}
129145

130146
LOG.atTrace()

ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/DeFramer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ final class DeFramer extends ByteToMessageDecoder {
7474
private final PeerLookup peerLookup;
7575
private boolean hellosExchanged;
7676
private final LabelledMetric<Counter> outboundMessagesCounter;
77+
private final LabelledMetric<Counter> outboundBytesCounter;
7778

7879
DeFramer(
7980
final Framer framer,
@@ -101,6 +102,14 @@ final class DeFramer extends ByteToMessageDecoder {
101102
"protocol",
102103
"name",
103104
"code");
105+
this.outboundBytesCounter =
106+
metricsSystem.createLabelledCounter(
107+
BesuMetricCategory.NETWORK,
108+
"p2p_bytes_outbound",
109+
"Count of bytes sent outbound.",
110+
"protocol",
111+
"name",
112+
"code");
104113
}
105114

106115
@Override
@@ -165,6 +174,7 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final L
165174
capabilityMultiplexer,
166175
connectionEventDispatcher,
167176
outboundMessagesCounter,
177+
outboundBytesCounter,
168178
inboundInitiated);
169179

170180
// Check peer is who we expected

ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/connections/netty/NettyPeerConnection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public NettyPeerConnection(
4444
final CapabilityMultiplexer multiplexer,
4545
final PeerConnectionEventDispatcher connectionEventDispatcher,
4646
final LabelledMetric<Counter> outboundMessagesCounter,
47+
final LabelledMetric<Counter> outboundBytesCounter,
4748
final boolean inboundInitiated) {
4849
super(
4950
peer,
@@ -54,6 +55,7 @@ public NettyPeerConnection(
5455
multiplexer,
5556
connectionEventDispatcher,
5657
outboundMessagesCounter,
58+
outboundBytesCounter,
5759
inboundInitiated);
5860

5961
this.ctx = ctx;
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
/*
2+
* Copyright contributors to Hyperledger Besu.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*
13+
* SPDX-License-Identifier: Apache-2.0
14+
*/
15+
package org.hyperledger.besu.ethereum.p2p.network;
16+
17+
import static org.mockito.ArgumentMatchers.any;
18+
import static org.mockito.ArgumentMatchers.anyInt;
19+
import static org.mockito.ArgumentMatchers.anyLong;
20+
import static org.mockito.ArgumentMatchers.anyString;
21+
import static org.mockito.ArgumentMatchers.eq;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.times;
24+
import static org.mockito.Mockito.verify;
25+
import static org.mockito.Mockito.when;
26+
27+
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
28+
import org.hyperledger.besu.ethereum.p2p.rlpx.MessageCallback;
29+
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
30+
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
31+
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.DefaultMessage;
32+
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message;
33+
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
34+
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
35+
import org.hyperledger.besu.plugin.services.MetricsSystem;
36+
import org.hyperledger.besu.plugin.services.metrics.Counter;
37+
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
38+
39+
import java.util.List;
40+
import java.util.function.BiFunction;
41+
42+
import org.junit.jupiter.api.BeforeEach;
43+
import org.junit.jupiter.api.Test;
44+
import org.mockito.ArgumentCaptor;
45+
46+
public class NetworkRunnerTest {
47+
48+
private NetworkRunner networkRunner;
49+
private P2PNetwork network;
50+
private LabelledMetric<Counter> inboundMessageCounter;
51+
private LabelledMetric<Counter> inboundBytesCounter;
52+
private Counter messageCounter;
53+
private Counter bytesCounter;
54+
private SubProtocol subProtocol;
55+
private ProtocolManager protocolManager;
56+
57+
@BeforeEach
58+
@SuppressWarnings("unchecked")
59+
public void setUp() {
60+
network = mock(P2PNetwork.class);
61+
subProtocol = mock(SubProtocol.class);
62+
protocolManager = mock(ProtocolManager.class);
63+
64+
inboundMessageCounter = mock(LabelledMetric.class);
65+
inboundBytesCounter = mock(LabelledMetric.class);
66+
messageCounter = mock(Counter.class);
67+
bytesCounter = mock(Counter.class);
68+
69+
MetricsSystem metricsSystem = mock(MetricsSystem.class);
70+
when(metricsSystem.createLabelledCounter(
71+
any(), eq("p2p_messages_inbound"), any(), any(), any(), any()))
72+
.thenReturn(inboundMessageCounter);
73+
when(metricsSystem.createLabelledCounter(
74+
any(), eq("p2p_bytes_inbound"), any(), any(), any(), any()))
75+
.thenReturn(inboundBytesCounter);
76+
77+
when(inboundMessageCounter.labels(anyString(), anyString(), anyString()))
78+
.thenReturn(messageCounter);
79+
when(inboundBytesCounter.labels(anyString(), anyString(), anyString()))
80+
.thenReturn(bytesCounter);
81+
82+
// Setup subProtocol to return "eth" as its name so it can be looked up
83+
when(subProtocol.getName()).thenReturn("eth");
84+
85+
// Setup network mocks to allow start() to complete
86+
when(network.isListening()).thenReturn(true);
87+
88+
BiFunction<Peer, Boolean, Boolean> ethPeersShouldConnect = (peer, incoming) -> true;
89+
90+
NetworkRunner.NetworkBuilder networkBuilder = caps -> network;
91+
92+
networkRunner =
93+
NetworkRunner.builder()
94+
.protocolManagers(List.of(protocolManager))
95+
.subProtocols(subProtocol)
96+
.network(networkBuilder)
97+
.metricsSystem(metricsSystem)
98+
.ethPeersShouldConnect(ethPeersShouldConnect)
99+
.build();
100+
}
101+
102+
@Test
103+
public void shouldIncrementInboundBytesCounterWhenProcessingMessage() {
104+
// Setup
105+
Capability capability = Capability.create("eth", 68);
106+
int messageCode = 1;
107+
int messageSize = 1024;
108+
109+
when(subProtocol.getName()).thenReturn("eth");
110+
when(subProtocol.isValidMessageCode(anyInt(), eq(messageCode))).thenReturn(true);
111+
when(subProtocol.messageName(anyInt(), eq(messageCode))).thenReturn("Status");
112+
when(protocolManager.getSupportedCapabilities()).thenReturn(List.of(capability));
113+
114+
// Start network runner to register handlers
115+
networkRunner.start();
116+
117+
// Capture the message handler that was registered
118+
ArgumentCaptor<MessageCallback> handlerCaptor = ArgumentCaptor.forClass(MessageCallback.class);
119+
verify(network).subscribe(eq(capability), handlerCaptor.capture());
120+
121+
MessageCallback handler = handlerCaptor.getValue();
122+
123+
// Create test message
124+
PeerConnection peerConnection = mock(PeerConnection.class);
125+
MessageData messageData = mock(MessageData.class);
126+
when(messageData.getSize()).thenReturn(messageSize);
127+
when(messageData.getCode()).thenReturn(messageCode);
128+
129+
Message message = new DefaultMessage(peerConnection, messageData);
130+
131+
// Process message through the handler
132+
handler.onMessage(capability, message);
133+
134+
// Verify bytes counter was incremented with correct labels and size
135+
verify(inboundBytesCounter).labels(eq(capability.toString()), eq("Status"), eq("1"));
136+
verify(bytesCounter).inc(messageSize);
137+
}
138+
139+
@Test
140+
public void shouldIncrementInboundBytesCounterForMultipleMessages() {
141+
// Setup
142+
Capability capability = Capability.create("eth", 68);
143+
int messageCode1 = 1;
144+
int messageCode2 = 2;
145+
int messageSize1 = 512;
146+
int messageSize2 = 2048;
147+
148+
when(subProtocol.getName()).thenReturn("eth");
149+
when(subProtocol.isValidMessageCode(anyInt(), anyInt())).thenReturn(true);
150+
when(subProtocol.messageName(anyInt(), eq(messageCode1))).thenReturn("Status");
151+
when(subProtocol.messageName(anyInt(), eq(messageCode2))).thenReturn("GetBlockHeaders");
152+
when(protocolManager.getSupportedCapabilities()).thenReturn(List.of(capability));
153+
154+
// Start network runner to register handlers
155+
networkRunner.start();
156+
157+
// Capture the message handler
158+
ArgumentCaptor<MessageCallback> handlerCaptor = ArgumentCaptor.forClass(MessageCallback.class);
159+
verify(network).subscribe(eq(capability), handlerCaptor.capture());
160+
161+
MessageCallback handler = handlerCaptor.getValue();
162+
163+
// Create first test message
164+
PeerConnection peerConnection = mock(PeerConnection.class);
165+
MessageData messageData1 = mock(MessageData.class);
166+
when(messageData1.getSize()).thenReturn(messageSize1);
167+
when(messageData1.getCode()).thenReturn(messageCode1);
168+
Message message1 = new DefaultMessage(peerConnection, messageData1);
169+
170+
// Create second test message
171+
MessageData messageData2 = mock(MessageData.class);
172+
when(messageData2.getSize()).thenReturn(messageSize2);
173+
when(messageData2.getCode()).thenReturn(messageCode2);
174+
Message message2 = new DefaultMessage(peerConnection, messageData2);
175+
176+
// Process both messages
177+
handler.onMessage(capability, message1);
178+
handler.onMessage(capability, message2);
179+
180+
// Verify bytes counter was incremented for both messages
181+
verify(bytesCounter).inc(messageSize1);
182+
verify(bytesCounter).inc(messageSize2);
183+
verify(bytesCounter, times(2)).inc(anyLong());
184+
}
185+
186+
@Test
187+
public void shouldUseCorrectLabelsForInboundBytesCounter() {
188+
// Setup
189+
Capability capability = Capability.create("eth", 68);
190+
int messageCode = 5;
191+
int messageSize = 256;
192+
String messageName = "NewBlock";
193+
194+
when(subProtocol.getName()).thenReturn("eth");
195+
when(subProtocol.isValidMessageCode(anyInt(), eq(messageCode))).thenReturn(true);
196+
when(subProtocol.messageName(anyInt(), eq(messageCode))).thenReturn(messageName);
197+
when(protocolManager.getSupportedCapabilities()).thenReturn(List.of(capability));
198+
199+
// Start network runner to register handlers
200+
networkRunner.start();
201+
202+
// Capture the message handler
203+
ArgumentCaptor<MessageCallback> handlerCaptor = ArgumentCaptor.forClass(MessageCallback.class);
204+
verify(network).subscribe(eq(capability), handlerCaptor.capture());
205+
206+
MessageCallback handler = handlerCaptor.getValue();
207+
208+
// Create test message
209+
PeerConnection peerConnection = mock(PeerConnection.class);
210+
MessageData messageData = mock(MessageData.class);
211+
when(messageData.getSize()).thenReturn(messageSize);
212+
when(messageData.getCode()).thenReturn(messageCode);
213+
Message message = new DefaultMessage(peerConnection, messageData);
214+
215+
// Process message
216+
handler.onMessage(capability, message);
217+
218+
// Verify correct labels were used (protocol, message name, code)
219+
verify(inboundBytesCounter)
220+
.labels(eq(capability.toString()), eq(messageName), eq(String.valueOf(messageCode)));
221+
verify(bytesCounter).inc(messageSize);
222+
}
223+
}

0 commit comments

Comments
 (0)