Skip to content

Commit bf45bf8

Browse files
committed
When doing PLAIN or CURVE authentication, the logic can be exchanged.
The client is doing the bind, and the server doing the connect. That was not handled, a connect socket was not expected to reuse the session, and so found an already configured zapPipe. It’s now handled. Some error message was not returning any message, the specification says that an empty error should be returned instead. Big rewrite of the tests for mechanisms, the big test function is splited and more code is shared.
1 parent 1f36de0 commit bf45bf8

File tree

7 files changed

+830
-679
lines changed

7 files changed

+830
-679
lines changed

src/main/java/zmq/io/SessionBase.java

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -302,39 +302,40 @@ protected void processPlug()
302302

303303
public int zapConnect()
304304
{
305-
assert (zapPipe == null);
306-
307-
Ctx.Endpoint peer = findEndpoint("inproc://zeromq.zap.01");
308-
if (peer.socket == null) {
309-
errno.set(ZError.ECONNREFUSED);
310-
return ZError.ECONNREFUSED;
311-
}
312-
if (peer.options.type != ZMQ.ZMQ_REP && peer.options.type != ZMQ.ZMQ_ROUTER &&
313-
peer.options.type != ZMQ.ZMQ_SERVER) {
314-
errno.set(ZError.ECONNREFUSED);
315-
return ZError.ECONNREFUSED;
316-
}
305+
// Session might be reused with zap connexion already established, don't panic
306+
if (zapPipe == null) {
307+
Ctx.Endpoint peer = findEndpoint("inproc://zeromq.zap.01");
308+
if (peer.socket == null) {
309+
errno.set(ZError.ECONNREFUSED);
310+
return ZError.ECONNREFUSED;
311+
}
312+
if (peer.options.type != ZMQ.ZMQ_REP && peer.options.type != ZMQ.ZMQ_ROUTER &&
313+
peer.options.type != ZMQ.ZMQ_SERVER) {
314+
errno.set(ZError.ECONNREFUSED);
315+
return ZError.ECONNREFUSED;
316+
}
317317

318-
// Create a bi-directional pipe that will connect
319-
// session with zap socket.
320-
ZObject[] parents = { this, peer.socket };
321-
int[] hwms = { 0, 0 };
322-
boolean[] conflates = { false, false };
323-
Pipe[] pipes = Pipe.pair(parents, hwms, conflates);
318+
// Create a bi-directional pipe that will connect
319+
// session with zap socket.
320+
ZObject[] parents = { this, peer.socket };
321+
int[] hwms = { 0, 0 };
322+
boolean[] conflates = { false, false };
323+
Pipe[] pipes = Pipe.pair(parents, hwms, conflates);
324324

325-
// Attach local end of the pipe to this socket object.
326-
zapPipe = pipes[0];
327-
zapPipe.setNoDelay();
328-
zapPipe.setEventSink(this);
325+
// Attach local end of the pipe to this socket object.
326+
zapPipe = pipes[0];
327+
zapPipe.setNoDelay();
328+
zapPipe.setEventSink(this);
329329

330-
sendBind(peer.socket, pipes[1], false);
330+
sendBind(peer.socket, pipes[1], false);
331331

332-
// Send empty identity if required by the peer.
333-
if (peer.options.recvIdentity) {
334-
Msg id = new Msg();
335-
id.setFlags(Msg.IDENTITY);
336-
zapPipe.write(id);
337-
zapPipe.flush();
332+
// Send empty identity if required by the peer.
333+
if (peer.options.recvIdentity) {
334+
Msg id = new Msg();
335+
id.setFlags(Msg.IDENTITY);
336+
zapPipe.write(id);
337+
zapPipe.flush();
338+
}
338339
}
339340
return 0;
340341
}

src/main/java/zmq/io/mechanism/curve/CurveServerMechanism.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,9 @@ private int produceError(Msg msg)
513513
if (statusCode != null) {
514514
msg.putShortString(statusCode);
515515
}
516+
else {
517+
msg.putShortString("");
518+
}
516519

517520
return 0;
518521
}
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package zmq.io.mechanism;
2+
3+
import java.io.IOException;
4+
import java.io.OutputStream;
5+
import java.net.Socket;
6+
import java.util.Optional;
7+
import java.util.concurrent.CompletableFuture;
8+
import java.util.concurrent.ExecutionException;
9+
import java.util.concurrent.TimeUnit;
10+
import java.util.concurrent.TimeoutException;
11+
import java.util.function.BiFunction;
12+
import java.util.function.Consumer;
13+
import java.util.function.Function;
14+
15+
import zmq.Ctx;
16+
import zmq.Helper;
17+
import zmq.Msg;
18+
import zmq.Options;
19+
import zmq.SocketBase;
20+
import zmq.ZMQ;
21+
import zmq.util.TestUtils;
22+
23+
import static org.hamcrest.CoreMatchers.is;
24+
import static org.hamcrest.CoreMatchers.notNullValue;
25+
import static org.hamcrest.CoreMatchers.nullValue;
26+
import static org.hamcrest.MatcherAssert.assertThat;
27+
28+
class MechanismTester
29+
{
30+
abstract static class TestContext
31+
{
32+
Ctx zctxt;
33+
SocketBase server;
34+
SocketBase client;
35+
SocketBase zapHandler;
36+
String host = "tcp://127.0.0.1:*";
37+
}
38+
39+
static <C extends TestContext> Boolean runTest(C testCtx, boolean withzap, Function<C, Boolean> tested,
40+
BiFunction<SocketBase, CompletableFuture<Boolean>, ZapHandler> zapProvider,
41+
Runnable configurator) throws InterruptedException
42+
{
43+
testCtx.zctxt = ZMQ.createContext();
44+
45+
// Future to hold the result of Zap handler processing
46+
// true: zap allowed the connexion
47+
// false: zap refused the connexion
48+
// null: zap didn't do any processing
49+
CompletableFuture<Boolean> zapFuture = new CompletableFuture<>();
50+
Thread zapThread;
51+
52+
if (withzap) {
53+
testCtx.zapHandler = ZMQ.socket(testCtx.zctxt, ZMQ.ZMQ_REP);
54+
ZapHandler handler = zapProvider.apply(testCtx.zapHandler, zapFuture);
55+
zapThread = handler.start();
56+
if (zapFuture.isDone()) {
57+
// zap future should not be already finished
58+
try {
59+
assertThat(zapFuture.get().toString(), false);
60+
}
61+
catch (InterruptedException | ExecutionException e) {
62+
throw new IllegalStateException(e);
63+
}
64+
}
65+
}
66+
else {
67+
zapFuture.complete(null);
68+
zapThread = null;
69+
testCtx.zapHandler = null;
70+
}
71+
testCtx.server = ZMQ.socket(testCtx.zctxt, ZMQ.ZMQ_DEALER);
72+
assertThat(testCtx.server, notNullValue());
73+
74+
testCtx.client = ZMQ.socket(testCtx.zctxt, ZMQ.ZMQ_DEALER);
75+
assertThat(testCtx.client, notNullValue());
76+
77+
configurator.run();
78+
79+
boolean isSuccess = tested.apply(testCtx);
80+
81+
// Exchange messages only if both sockets are used, some tests uses a plain socket
82+
if (testCtx.server != null && testCtx.client != null) {
83+
if (isSuccess) {
84+
Helper.bounce(testCtx.server, testCtx.client);
85+
}
86+
else {
87+
Helper.expectBounceFail(testCtx.server, testCtx.client);
88+
}
89+
}
90+
91+
Optional.ofNullable(testCtx.client).ifPresent(ZMQ::closeZeroLinger);
92+
Optional.ofNullable(testCtx.server).ifPresent(ZMQ::closeZeroLinger);
93+
94+
// Wait until ZAP handler terminates
95+
//Optional.ofNullable(testCtx.zapHandler).ifPresent(ZMQ::closeZeroLinger);
96+
Optional.ofNullable(zapThread).ifPresent(t -> {
97+
try {
98+
t.interrupt();
99+
t.join(5000);
100+
}
101+
catch (InterruptedException e) {
102+
throw new IllegalStateException(e);
103+
}
104+
});
105+
106+
// Shutdown
107+
ZMQ.term(testCtx.zctxt);
108+
109+
try {
110+
return zapFuture.get(5, TimeUnit.SECONDS);
111+
}
112+
catch (InterruptedException | ExecutionException | TimeoutException e) {
113+
throw new IllegalStateException(e);
114+
}
115+
}
116+
117+
public static <T extends TestContext> boolean testRawSocket(T ctx)
118+
{
119+
try {
120+
boolean rc;
121+
122+
int timeout = 250;
123+
ZMQ.setSocketOption(ctx.server, ZMQ.ZMQ_RCVTIMEO, timeout);
124+
125+
rc = ZMQ.bind(ctx.server, ctx.host);
126+
assertThat(rc, is(true));
127+
int port = TestUtils.port((String) ZMQ.getSocketOptionExt(ctx.server, ZMQ.ZMQ_LAST_ENDPOINT));
128+
129+
ZMQ.closeZeroLinger(ctx.client);
130+
ctx.client = null;
131+
132+
try (Socket sock = new Socket("127.0.0.1", port)) {
133+
// send anonymous ZMTP/1.0 greeting
134+
OutputStream out = sock.getOutputStream();
135+
out.write(("1" + 0x00).getBytes(ZMQ.CHARSET));
136+
// send sneaky message that shouldn't be received
137+
out.write(
138+
("8" + 0x00 + "sneaky" + 0x00)
139+
.getBytes(ZMQ.CHARSET));
140+
141+
Msg msg = ZMQ.recv(ctx.server, 0);
142+
assertThat(msg, nullValue());
143+
144+
}
145+
return false;
146+
}
147+
catch (IOException e) {
148+
throw new RuntimeException(e);
149+
}
150+
}
151+
152+
public static void checkOptions(Mechanisms mechanism, Consumer<Options> setOptions)
153+
{
154+
Options opt = new Options();
155+
setOptions.accept(opt);
156+
mechanism.check(opt);
157+
}
158+
159+
private MechanismTester()
160+
{
161+
// static only class
162+
}
163+
}

0 commit comments

Comments
 (0)