Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
- Performance: 5-6x faster toFastHex calculation for engine_getBlobsV2 [#9426](https://github.com/hyperledger/besu/pull/9426)
- Performance: Optimise `engine_getPayload*` methods and `engine_getBlobsV2` [#9445](https://github.com/hyperledger/besu/pull/9445)
- Add Linea named networks for mainnet and sepolia on Linea network [#9436](https://github.com/hyperledger/besu/pull/9436)
- Add eth_subscribe and eth_unsubscribe support to IPC service [#9504](https://github.com/hyperledger/besu/pull/9504)


### Bug fixes
- Fix loss of colored output in terminal when using `--color-enabled=true` option [#8908](https://github.com/hyperledger/besu/issues/8908)
Expand Down
26 changes: 17 additions & 9 deletions app/src/main/java/org/hyperledger/besu/RunnerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,8 @@ public Runner build() {
ethPeers.setRlpxAgent(networkRunner.getRlpxAgent());

final P2PNetwork network = networkRunner.getNetwork();
// ForkId in Ethereum Node Record needs updating when we transition to a new protocol spec
// ForkId in Ethereum Node Record needs updating when we transition to a new
// protocol spec
context
.getBlockchain()
.observeBlockAdded(
Expand Down Expand Up @@ -833,6 +834,16 @@ public Runner build() {
final SubscriptionManager subscriptionManager =
createSubscriptionManager(vertx, transactionPool, blockchainQueries);

if (webSocketConfiguration.isEnabled()
|| (jsonRpcIpcConfiguration != null && jsonRpcIpcConfiguration.isEnabled())) {
createLogsSubscriptionService(context.getBlockchain(), subscriptionManager);

createNewBlockHeadersSubscriptionService(
context.getBlockchain(), blockchainQueries, subscriptionManager);

createSyncingSubscriptionService(synchronizer, subscriptionManager);
}

Optional<EngineJsonRpcService> engineJsonRpcService = Optional.empty();
if (engineJsonRpcConfiguration.isPresent() && engineJsonRpcConfiguration.get().isEnabled()) {
final Map<String, JsonRpcMethod> engineMethods =
Expand Down Expand Up @@ -959,13 +970,6 @@ public Runner build() {
transactionSimulator,
besuController.getProtocolManager().ethContext().getScheduler());

createLogsSubscriptionService(context.getBlockchain(), subscriptionManager);

createNewBlockHeadersSubscriptionService(
context.getBlockchain(), blockchainQueries, subscriptionManager);

createSyncingSubscriptionService(synchronizer, subscriptionManager);

webSocketService =
Optional.of(
createWebsocketService(
Expand Down Expand Up @@ -1034,12 +1038,16 @@ public Runner build() {
transactionSimulator,
besuController.getProtocolManager().ethContext().getScheduler());

final WebSocketMethodsFactory ipcMethodsFactory =
new WebSocketMethodsFactory(subscriptionManager, ipcMethods);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change necessary?

Copy link
Copy Markdown
Contributor Author

@MqllR MqllR Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried without, but it turns out that this is what:

So unless we want to manage it in another way, I'd answer yes, it's necessary !

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tagging this as #techdebt for later investigation, you've found a code smell: reactive methods are bound to a transport type, which is excessive coupling.


jsonRpcIpcService =
Optional.of(
new JsonRpcIpcService(
vertx,
jsonRpcIpcConfiguration.getPath(),
new JsonRpcExecutor(new BaseJsonRpcProcessor(), ipcMethods)));
new JsonRpcExecutor(new BaseJsonRpcProcessor(), ipcMethodsFactory.methods()),
Optional.of(subscriptionManager)));
} else {
jsonRpcIpcService = Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.RpcErrorType;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketRpcRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.plugin.services.rpc.RpcResponseType;

import java.io.IOException;
Expand All @@ -30,6 +32,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import com.fasterxml.jackson.core.JsonGenerator;
Expand Down Expand Up @@ -63,12 +66,22 @@ public class JsonRpcIpcService {
private final Vertx vertx;
private final Path path;
private final JsonRpcExecutor jsonRpcExecutor;
private final Optional<SubscriptionManager> subscriptionManager;
private NetServer netServer;

public JsonRpcIpcService(final Vertx vertx, final Path path, final JsonRpcExecutor rpcExecutor) {
this(vertx, path, rpcExecutor, Optional.empty());
}

public JsonRpcIpcService(
final Vertx vertx,
final Path path,
final JsonRpcExecutor rpcExecutor,
final Optional<SubscriptionManager> subscriptionManager) {
this.vertx = vertx;
this.path = path;
this.jsonRpcExecutor = rpcExecutor;
this.subscriptionManager = subscriptionManager;
}

public Future<NetServer> start() {
Expand All @@ -82,14 +95,37 @@ public Future<NetServer> start() {

private void handleNewConnection(final NetSocket socket) {
final AtomicBoolean closedSocket = new AtomicBoolean(false);
final String connectionId = UUID.randomUUID().toString();

subscriptionManager.ifPresent(
manager ->
vertx
.eventBus()
.consumer(connectionId)
.handler(
msg -> {
if (!closedSocket.get()) {
socket.write(Buffer.buffer(msg.body().toString() + '\n'));
}
}));

socket
.closeHandler(unused -> closedSocket.set(true))
.closeHandler(
unused -> {
closedSocket.set(true);
subscriptionManager.ifPresent(
manager ->
vertx
.eventBus()
.send(
SubscriptionManager.EVENTBUS_REMOVE_SUBSCRIPTIONS_ADDRESS,
connectionId));
})
.handler(
JsonRpcParserHandler.ipcHandler(
(jsonObj, jsonArr) -> {
if (jsonObj != null) {
handleSingleRequest(socket, jsonObj, closedSocket);
handleSingleRequest(socket, jsonObj, closedSocket, connectionId);
} else if (jsonArr != null) {
handleBatchRequest(socket, jsonArr, closedSocket);
}
Expand All @@ -98,7 +134,10 @@ private void handleNewConnection(final NetSocket socket) {
}

private void handleSingleRequest(
final NetSocket socket, final JsonObject jsonRpcRequest, final AtomicBoolean closedSocket) {
final NetSocket socket,
final JsonObject jsonRpcRequest,
final AtomicBoolean closedSocket,
final String connectionId) {
vertx
.<JsonRpcResponse>executeBlocking(
promise -> {
Expand All @@ -109,7 +148,16 @@ private void handleSingleRequest(
null,
closedSocket::get,
jsonRpcRequest,
req -> req.mapTo(JsonRpcRequest.class));
req -> {
if (subscriptionManager.isPresent()) {
final WebSocketRpcRequest websocketRequest =
req.mapTo(WebSocketRpcRequest.class);
websocketRequest.setConnectionId(connectionId);
return websocketRequest;
} else {
return req.mapTo(JsonRpcRequest.class);
}
});
promise.complete(jsonRpcResponse);
})
.onSuccess(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketMethodsFactory;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
Expand Down Expand Up @@ -259,4 +265,156 @@ private void assertSocketCall(
}))
.write(request)))));
}

@Test
void subscriptionRequestSuccessful() {
final Path socketPath = tempDir.resolve("besu-test.ipc");
final SubscriptionManager subscriptionManager =
new SubscriptionManager(new NoOpMetricsSystem());
vertx.deployVerticle(subscriptionManager);

final Map<String, JsonRpcMethod> methods =
new WebSocketMethodsFactory(subscriptionManager, new HashMap<>()).methods();

final JsonRpcIpcService service =
new JsonRpcIpcService(
vertx,
socketPath,
new JsonRpcExecutor(new BaseJsonRpcProcessor(), methods),
Optional.of(subscriptionManager));

final String request = "{\"id\":1,\"method\":\"eth_subscribe\",\"params\":[\"newHeads\"]}\n";
final String expectedResponse = "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"0x1\"}";

service
.start()
.onComplete(
testContext.succeeding(
server ->
vertx
.createNetClient()
.connect(SocketAddress.domainSocketAddress(socketPath.toString()))
.onComplete(
testContext.succeeding(
socket ->
socket
.handler(
buffer ->
testContext.verify(
() -> {
assertThat(buffer.toString().trim())
.isEqualTo(expectedResponse);
service
.stop()
.onComplete(
testContext.succeedingThenComplete());
}))
.write(Buffer.buffer(request))))));
}

@Test
void unsubscribeRequestSuccessful() {
final Path socketPath = tempDir.resolve("besu-test.ipc");
final SubscriptionManager subscriptionManager =
new SubscriptionManager(new NoOpMetricsSystem());
vertx.deployVerticle(subscriptionManager);

final Map<String, JsonRpcMethod> methods =
new WebSocketMethodsFactory(subscriptionManager, new HashMap<>()).methods();

final JsonRpcIpcService service =
new JsonRpcIpcService(
vertx,
socketPath,
new JsonRpcExecutor(new BaseJsonRpcProcessor(), methods),
Optional.of(subscriptionManager));

final String subscribeRequest =
"{\"id\":1,\"method\":\"eth_subscribe\",\"params\":[\"newHeads\"]}\n";
final String unsubscribeRequest =
"{\"id\":2,\"method\":\"eth_unsubscribe\",\"params\":[\"0x1\"]}\n";
final AtomicInteger messageCount = new AtomicInteger(0);

service
.start()
.onComplete(
testContext.succeeding(
server ->
vertx
.createNetClient()
.connect(SocketAddress.domainSocketAddress(socketPath.toString()))
.onComplete(
testContext.succeeding(
socket -> {
socket.handler(
buffer -> {
final int count = messageCount.incrementAndGet();
if (count == 1) {
// First response is subscribe
socket.write(Buffer.buffer(unsubscribeRequest));
} else if (count == 2) {
// Second response is unsubscribe
testContext.verify(
() -> {
assertThat(buffer.toString().trim())
.contains("\"result\":true");
service
.stop()
.onComplete(
testContext.succeedingThenComplete());
});
}
});
socket.write(Buffer.buffer(subscribeRequest));
}))));
}

@Test
void batchRequestDoesNotSupportSubscriptions() {
final Path socketPath = tempDir.resolve("besu-test.ipc");
final SubscriptionManager subscriptionManager =
new SubscriptionManager(new NoOpMetricsSystem());
vertx.deployVerticle(subscriptionManager);

final Map<String, JsonRpcMethod> methods =
new WebSocketMethodsFactory(subscriptionManager, new HashMap<>()).methods();

final JsonRpcIpcService service =
new JsonRpcIpcService(
vertx,
socketPath,
new JsonRpcExecutor(new BaseJsonRpcProcessor(), methods),
Optional.of(subscriptionManager));

final String batchRequest =
"[{\"id\":1,\"method\":\"eth_subscribe\",\"params\":[\"newHeads\"]},"
+ "{\"id\":2,\"method\":\"eth_subscribe\",\"params\":[\"logs\"]}]";

service
.start()
.onComplete(
testContext.succeeding(
server ->
vertx
.createNetClient()
.connect(SocketAddress.domainSocketAddress(socketPath.toString()))
.onComplete(
testContext.succeeding(
socket ->
socket
.handler(
buffer ->
testContext.verify(
() -> {
// Batch requests with subscriptions should
// fail
assertThat(buffer.toString())
.contains("\"error\"");
service
.stop()
.onComplete(
testContext.succeedingThenComplete());
}))
.write(Buffer.buffer(batchRequest))))));
}
}