Skip to content

Commit 2cb9523

Browse files
committed
feat: Add eth_subscribe and eth_unsubscribe support to IPC service
Signed-off-by: Mael Regnery <mael@mqli.fr>
1 parent eb6db84 commit 2cb9523

File tree

3 files changed

+227
-13
lines changed

3 files changed

+227
-13
lines changed

app/src/main/java/org/hyperledger/besu/RunnerBuilder.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,8 @@ public Runner build() {
729729
ethPeers.setRlpxAgent(networkRunner.getRlpxAgent());
730730

731731
final P2PNetwork network = networkRunner.getNetwork();
732-
// ForkId in Ethereum Node Record needs updating when we transition to a new protocol spec
732+
// ForkId in Ethereum Node Record needs updating when we transition to a new
733+
// protocol spec
733734
context
734735
.getBlockchain()
735736
.observeBlockAdded(
@@ -833,6 +834,16 @@ public Runner build() {
833834
final SubscriptionManager subscriptionManager =
834835
createSubscriptionManager(vertx, transactionPool, blockchainQueries);
835836

837+
if (webSocketConfiguration.isEnabled()
838+
|| (jsonRpcIpcConfiguration != null && jsonRpcIpcConfiguration.isEnabled())) {
839+
createLogsSubscriptionService(context.getBlockchain(), subscriptionManager);
840+
841+
createNewBlockHeadersSubscriptionService(
842+
context.getBlockchain(), blockchainQueries, subscriptionManager);
843+
844+
createSyncingSubscriptionService(synchronizer, subscriptionManager);
845+
}
846+
836847
Optional<EngineJsonRpcService> engineJsonRpcService = Optional.empty();
837848
if (engineJsonRpcConfiguration.isPresent() && engineJsonRpcConfiguration.get().isEnabled()) {
838849
final Map<String, JsonRpcMethod> engineMethods =
@@ -959,13 +970,6 @@ public Runner build() {
959970
transactionSimulator,
960971
besuController.getProtocolManager().ethContext().getScheduler());
961972

962-
createLogsSubscriptionService(context.getBlockchain(), subscriptionManager);
963-
964-
createNewBlockHeadersSubscriptionService(
965-
context.getBlockchain(), blockchainQueries, subscriptionManager);
966-
967-
createSyncingSubscriptionService(synchronizer, subscriptionManager);
968-
969973
webSocketService =
970974
Optional.of(
971975
createWebsocketService(
@@ -1034,12 +1038,16 @@ public Runner build() {
10341038
transactionSimulator,
10351039
besuController.getProtocolManager().ethContext().getScheduler());
10361040

1041+
final WebSocketMethodsFactory ipcMethodsFactory =
1042+
new WebSocketMethodsFactory(subscriptionManager, ipcMethods);
1043+
10371044
jsonRpcIpcService =
10381045
Optional.of(
10391046
new JsonRpcIpcService(
10401047
vertx,
10411048
jsonRpcIpcConfiguration.getPath(),
1042-
new JsonRpcExecutor(new BaseJsonRpcProcessor(), ipcMethods)));
1049+
new JsonRpcExecutor(new BaseJsonRpcProcessor(), ipcMethodsFactory.methods()),
1050+
Optional.of(subscriptionManager)));
10431051
} else {
10441052
jsonRpcIpcService = Optional.empty();
10451053
}

ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/ipc/JsonRpcIpcService.java

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
2323
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
2424
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.RpcErrorType;
25+
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketRpcRequest;
26+
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
2527
import org.hyperledger.besu.plugin.services.rpc.RpcResponseType;
2628

2729
import java.io.IOException;
@@ -30,6 +32,7 @@
3032
import java.util.ArrayList;
3133
import java.util.List;
3234
import java.util.Optional;
35+
import java.util.UUID;
3336
import java.util.concurrent.atomic.AtomicBoolean;
3437

3538
import com.fasterxml.jackson.core.JsonGenerator;
@@ -63,12 +66,22 @@ public class JsonRpcIpcService {
6366
private final Vertx vertx;
6467
private final Path path;
6568
private final JsonRpcExecutor jsonRpcExecutor;
69+
private final Optional<SubscriptionManager> subscriptionManager;
6670
private NetServer netServer;
6771

6872
public JsonRpcIpcService(final Vertx vertx, final Path path, final JsonRpcExecutor rpcExecutor) {
73+
this(vertx, path, rpcExecutor, Optional.empty());
74+
}
75+
76+
public JsonRpcIpcService(
77+
final Vertx vertx,
78+
final Path path,
79+
final JsonRpcExecutor rpcExecutor,
80+
final Optional<SubscriptionManager> subscriptionManager) {
6981
this.vertx = vertx;
7082
this.path = path;
7183
this.jsonRpcExecutor = rpcExecutor;
84+
this.subscriptionManager = subscriptionManager;
7285
}
7386

7487
public Future<NetServer> start() {
@@ -82,14 +95,37 @@ public Future<NetServer> start() {
8295

8396
private void handleNewConnection(final NetSocket socket) {
8497
final AtomicBoolean closedSocket = new AtomicBoolean(false);
98+
final String connectionId = UUID.randomUUID().toString();
99+
100+
subscriptionManager.ifPresent(
101+
manager ->
102+
vertx
103+
.eventBus()
104+
.consumer(connectionId)
105+
.handler(
106+
msg -> {
107+
if (!closedSocket.get()) {
108+
socket.write(Buffer.buffer(msg.body().toString() + '\n'));
109+
}
110+
}));
85111

86112
socket
87-
.closeHandler(unused -> closedSocket.set(true))
113+
.closeHandler(
114+
unused -> {
115+
closedSocket.set(true);
116+
subscriptionManager.ifPresent(
117+
manager ->
118+
vertx
119+
.eventBus()
120+
.send(
121+
SubscriptionManager.EVENTBUS_REMOVE_SUBSCRIPTIONS_ADDRESS,
122+
connectionId));
123+
})
88124
.handler(
89125
JsonRpcParserHandler.ipcHandler(
90126
(jsonObj, jsonArr) -> {
91127
if (jsonObj != null) {
92-
handleSingleRequest(socket, jsonObj, closedSocket);
128+
handleSingleRequest(socket, jsonObj, closedSocket, connectionId);
93129
} else if (jsonArr != null) {
94130
handleBatchRequest(socket, jsonArr, closedSocket);
95131
}
@@ -98,7 +134,10 @@ private void handleNewConnection(final NetSocket socket) {
98134
}
99135

100136
private void handleSingleRequest(
101-
final NetSocket socket, final JsonObject jsonRpcRequest, final AtomicBoolean closedSocket) {
137+
final NetSocket socket,
138+
final JsonObject jsonRpcRequest,
139+
final AtomicBoolean closedSocket,
140+
final String connectionId) {
102141
vertx
103142
.<JsonRpcResponse>executeBlocking(
104143
promise -> {
@@ -109,7 +148,16 @@ private void handleSingleRequest(
109148
null,
110149
closedSocket::get,
111150
jsonRpcRequest,
112-
req -> req.mapTo(JsonRpcRequest.class));
151+
req -> {
152+
if (subscriptionManager.isPresent()) {
153+
final WebSocketRpcRequest websocketRequest =
154+
req.mapTo(WebSocketRpcRequest.class);
155+
websocketRequest.setConnectionId(connectionId);
156+
return websocketRequest;
157+
} else {
158+
return req.mapTo(JsonRpcRequest.class);
159+
}
160+
});
113161
promise.complete(jsonRpcResponse);
114162
})
115163
.onSuccess(

ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/ipc/JsonRpcIpcServiceTest.java

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,18 @@
2323
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
2424
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
2525
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
26+
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketMethodsFactory;
27+
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
28+
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
2629

2730
import java.nio.file.Path;
2831
import java.util.Arrays;
2932
import java.util.Collections;
33+
import java.util.HashMap;
3034
import java.util.Map;
35+
import java.util.Optional;
3136
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicInteger;
3238

3339
import io.vertx.core.Vertx;
3440
import io.vertx.core.VertxOptions;
@@ -259,4 +265,156 @@ private void assertSocketCall(
259265
}))
260266
.write(request)))));
261267
}
268+
269+
@Test
270+
void subscriptionRequestSuccessful() {
271+
final Path socketPath = tempDir.resolve("besu-test.ipc");
272+
final SubscriptionManager subscriptionManager =
273+
new SubscriptionManager(new NoOpMetricsSystem());
274+
vertx.deployVerticle(subscriptionManager);
275+
276+
final Map<String, JsonRpcMethod> methods =
277+
new WebSocketMethodsFactory(subscriptionManager, new HashMap<>()).methods();
278+
279+
final JsonRpcIpcService service =
280+
new JsonRpcIpcService(
281+
vertx,
282+
socketPath,
283+
new JsonRpcExecutor(new BaseJsonRpcProcessor(), methods),
284+
Optional.of(subscriptionManager));
285+
286+
final String request = "{\"id\":1,\"method\":\"eth_subscribe\",\"params\":[\"newHeads\"]}\n";
287+
final String expectedResponse = "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"0x1\"}";
288+
289+
service
290+
.start()
291+
.onComplete(
292+
testContext.succeeding(
293+
server ->
294+
vertx
295+
.createNetClient()
296+
.connect(SocketAddress.domainSocketAddress(socketPath.toString()))
297+
.onComplete(
298+
testContext.succeeding(
299+
socket ->
300+
socket
301+
.handler(
302+
buffer ->
303+
testContext.verify(
304+
() -> {
305+
assertThat(buffer.toString().trim())
306+
.isEqualTo(expectedResponse);
307+
service
308+
.stop()
309+
.onComplete(
310+
testContext.succeedingThenComplete());
311+
}))
312+
.write(Buffer.buffer(request))))));
313+
}
314+
315+
@Test
316+
void unsubscribeRequestSuccessful() {
317+
final Path socketPath = tempDir.resolve("besu-test.ipc");
318+
final SubscriptionManager subscriptionManager =
319+
new SubscriptionManager(new NoOpMetricsSystem());
320+
vertx.deployVerticle(subscriptionManager);
321+
322+
final Map<String, JsonRpcMethod> methods =
323+
new WebSocketMethodsFactory(subscriptionManager, new HashMap<>()).methods();
324+
325+
final JsonRpcIpcService service =
326+
new JsonRpcIpcService(
327+
vertx,
328+
socketPath,
329+
new JsonRpcExecutor(new BaseJsonRpcProcessor(), methods),
330+
Optional.of(subscriptionManager));
331+
332+
final String subscribeRequest =
333+
"{\"id\":1,\"method\":\"eth_subscribe\",\"params\":[\"newHeads\"]}\n";
334+
final String unsubscribeRequest =
335+
"{\"id\":2,\"method\":\"eth_unsubscribe\",\"params\":[\"0x1\"]}\n";
336+
final AtomicInteger messageCount = new AtomicInteger(0);
337+
338+
service
339+
.start()
340+
.onComplete(
341+
testContext.succeeding(
342+
server ->
343+
vertx
344+
.createNetClient()
345+
.connect(SocketAddress.domainSocketAddress(socketPath.toString()))
346+
.onComplete(
347+
testContext.succeeding(
348+
socket -> {
349+
socket.handler(
350+
buffer -> {
351+
final int count = messageCount.incrementAndGet();
352+
if (count == 1) {
353+
// First response is subscribe
354+
socket.write(Buffer.buffer(unsubscribeRequest));
355+
} else if (count == 2) {
356+
// Second response is unsubscribe
357+
testContext.verify(
358+
() -> {
359+
assertThat(buffer.toString().trim())
360+
.contains("\"result\":true");
361+
service
362+
.stop()
363+
.onComplete(
364+
testContext.succeedingThenComplete());
365+
});
366+
}
367+
});
368+
socket.write(Buffer.buffer(subscribeRequest));
369+
}))));
370+
}
371+
372+
@Test
373+
void batchRequestDoesNotSupportSubscriptions() {
374+
final Path socketPath = tempDir.resolve("besu-test.ipc");
375+
final SubscriptionManager subscriptionManager =
376+
new SubscriptionManager(new NoOpMetricsSystem());
377+
vertx.deployVerticle(subscriptionManager);
378+
379+
final Map<String, JsonRpcMethod> methods =
380+
new WebSocketMethodsFactory(subscriptionManager, new HashMap<>()).methods();
381+
382+
final JsonRpcIpcService service =
383+
new JsonRpcIpcService(
384+
vertx,
385+
socketPath,
386+
new JsonRpcExecutor(new BaseJsonRpcProcessor(), methods),
387+
Optional.of(subscriptionManager));
388+
389+
final String batchRequest =
390+
"[{\"id\":1,\"method\":\"eth_subscribe\",\"params\":[\"newHeads\"]},"
391+
+ "{\"id\":2,\"method\":\"eth_subscribe\",\"params\":[\"logs\"]}]";
392+
393+
service
394+
.start()
395+
.onComplete(
396+
testContext.succeeding(
397+
server ->
398+
vertx
399+
.createNetClient()
400+
.connect(SocketAddress.domainSocketAddress(socketPath.toString()))
401+
.onComplete(
402+
testContext.succeeding(
403+
socket ->
404+
socket
405+
.handler(
406+
buffer ->
407+
testContext.verify(
408+
() -> {
409+
// Batch requests with subscriptions should
410+
// fail
411+
assertThat(buffer.toString())
412+
.contains("\"error\"");
413+
service
414+
.stop()
415+
.onComplete(
416+
testContext.succeedingThenComplete());
417+
}))
418+
.write(Buffer.buffer(batchRequest))))));
419+
}
262420
}

0 commit comments

Comments
 (0)