Skip to content

Commit 3b4e819

Browse files
authored
Properly handle SSL handshake updates (#1885)
Handle different handshake state changes in NIOSSL transports
1 parent c8b71ab commit 3b4e819

File tree

24 files changed

+683
-43
lines changed

24 files changed

+683
-43
lines changed

activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,12 @@ public Connection createConnection(URI remoteURI, String username, String passwo
5959
}
6060

6161
public Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
62-
ConnectionFactory factory = createConnectionFactory(remoteURI, username, password, syncPublish);
62+
return createConnection(remoteURI, username, password, clientId, syncPublish, null);
63+
}
64+
65+
public Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish,
66+
String sslProtocol) throws JMSException {
67+
ConnectionFactory factory = createConnectionFactory(remoteURI, username, password, syncPublish, sslProtocol);
6368

6469
Connection connection = factory.createConnection();
6570
connection.setExceptionListener(new ExceptionListener() {
@@ -166,7 +171,12 @@ private TopicConnectionFactory createTopicConnectionFactory(
166171
}
167172

168173
private ConnectionFactory createConnectionFactory(
169-
URI remoteURI, String username, String password, boolean syncPublish) {
174+
URI remoteURI, String username, String password, boolean syncPublish) {
175+
return createConnectionFactory(remoteURI, username, password, syncPublish, null);
176+
}
177+
178+
private ConnectionFactory createConnectionFactory(
179+
URI remoteURI, String username, String password, boolean syncPublish, String sslProtocol) {
170180

171181
String clientScheme;
172182
boolean useSSL = false;
@@ -204,6 +214,9 @@ private ConnectionFactory createConnectionFactory(
204214

205215
if (useSSL) {
206216
amqpURI += "?transport.verifyHost=false";
217+
if (sslProtocol != null) {
218+
amqpURI += "&transport.enabledProtocols=" + sslProtocol;
219+
}
207220
}
208221

209222
LOG.debug("In createConnectionFactory using URI: {}", amqpURI);

activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioPlusSslTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616
*/
1717
package org.apache.activemq.transport.amqp;
1818

19+
import jakarta.jms.Connection;
20+
import jakarta.jms.JMSException;
1921
import java.net.URI;
2022

23+
import java.net.URISyntaxException;
24+
import org.junit.Test;
2125
import org.slf4j.Logger;
2226
import org.slf4j.LoggerFactory;
2327

@@ -32,6 +36,13 @@ protected URI getBrokerURI() {
3236
return amqpNioPlusSslURI;
3337
}
3438

39+
protected Connection createConnection(String clientId, boolean syncPublish) throws JMSException {
40+
Connection connection = JMSClientContext.INSTANCE.createConnection(getBrokerURI(), "admin", "password", clientId, syncPublish,
41+
enabledProtocols);
42+
connection.start();
43+
return connection;
44+
}
45+
3546
@Override
3647
protected boolean isUseTcpConnector() {
3748
return false;
@@ -46,4 +57,15 @@ protected boolean isUseNioPlusSslConnector() {
4657
protected String getTargetConnectorName() {
4758
return "amqp+nio+ssl";
4859
}
60+
61+
@Test(timeout=30000)
62+
public void testSslHandshakeRenegotiationTlsv12() throws Exception {
63+
testSslHandshakeRenegotiation("TLSv1.2");
64+
}
65+
66+
@Test(timeout=30000)
67+
public void testSslHandshakeRenegotiationTlsv13() throws Exception {
68+
testSslHandshakeRenegotiation("TLSv1.3");
69+
}
70+
4971
}

activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,29 @@
1616
*/
1717
package org.apache.activemq.transport.amqp;
1818

19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertNotNull;
21+
import static org.junit.Assert.assertTrue;
22+
23+
import io.netty.channel.Channel;
24+
import io.netty.handler.ssl.SslHandler;
25+
import jakarta.jms.Message;
26+
import jakarta.jms.MessageConsumer;
27+
import jakarta.jms.MessageProducer;
28+
import jakarta.jms.Queue;
29+
import jakarta.jms.Session;
30+
import jakarta.jms.TextMessage;
31+
import java.lang.reflect.Field;
32+
import java.net.InetSocketAddress;
1933
import java.net.URI;
2034

35+
import org.apache.activemq.broker.TransportConnector;
36+
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
37+
import org.apache.activemq.util.NioSslTestUtil;
38+
import org.apache.qpid.jms.JmsConnection;
39+
import org.apache.qpid.jms.provider.amqp.AmqpProvider;
40+
import org.apache.qpid.jms.transports.netty.NettyTcpTransport;
41+
import org.objectweb.jtests.jms.framework.TestConfig;
2142
import org.slf4j.Logger;
2243
import org.slf4j.LoggerFactory;
2344

@@ -27,6 +48,16 @@
2748
public class JMSClientSslTest extends JMSClientTest {
2849
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientSslTest.class);
2950

51+
protected String enabledProtocols = null;
52+
53+
54+
@Override
55+
public void setUp() throws Exception {
56+
enabledProtocols = null;
57+
super.setUp();
58+
}
59+
60+
3061
@Override
3162
protected URI getBrokerURI() {
3263
return amqpSslURI;
@@ -46,4 +77,65 @@ protected boolean isUseSslConnector() {
4677
protected String getTargetConnectorName() {
4778
return "amqp+ssl";
4879
}
80+
81+
protected void testSslHandshakeRenegotiation(String protocol) throws Exception {
82+
enabledProtocols = protocol;
83+
84+
ActiveMQAdmin.enableJMSFrameTracing();
85+
86+
connection = createConnection();
87+
88+
JmsConnection jmsCon = (JmsConnection) connection;
89+
NettyTcpTransport transport = getNettyTransport(jmsCon);
90+
Channel channel = getNettyChannel(transport);
91+
SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
92+
assertEquals(protocol, sslHandler.engine().getSession().getProtocol());
93+
94+
// trigger handshakes
95+
for (int i = 0; i < 10; i++) {
96+
sslHandler.engine().beginHandshake();
97+
}
98+
99+
// give some time for the handshake updates
100+
Thread.sleep(100);
101+
102+
// check status advances if NIOSSL, then continue
103+
// below to verify transports are not stuck
104+
checkHandshakeStatusAdvances(((InetSocketAddress)channel.localAddress()).getPort());
105+
106+
// Make sure messages still work
107+
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
108+
Queue queue = session.createQueue(getDestinationName());
109+
MessageProducer p = session.createProducer(queue);
110+
111+
TextMessage message = session.createTextMessage();
112+
message.setText("hello");
113+
p.send(message);
114+
115+
MessageConsumer consumer = session.createConsumer(queue);
116+
Message msg = consumer.receive(100);
117+
assertNotNull(msg);
118+
assertTrue(msg instanceof TextMessage);
119+
120+
}
121+
122+
// This only applies to NIO SSL
123+
protected void checkHandshakeStatusAdvances(int localPort) throws Exception {
124+
TransportConnector connector = brokerService.getTransportConnectorByScheme(getBrokerURI().getScheme());
125+
NioSslTestUtil.checkHandshakeStatusAdvances(connector, localPort);
126+
}
127+
128+
private NettyTcpTransport getNettyTransport(JmsConnection jmsCon) throws Exception {
129+
Field providerField = JmsConnection.class.getDeclaredField("provider");
130+
providerField.setAccessible(true);
131+
AmqpProvider provider = (AmqpProvider) providerField.get(jmsCon);
132+
return (NettyTcpTransport) provider.getTransport();
133+
}
134+
135+
private Channel getNettyChannel(NettyTcpTransport transport) throws Exception {
136+
Field channelField = NettyTcpTransport.class.getDeclaredField("channel");
137+
channelField.setAccessible(true);
138+
return (Channel) channelField.get(transport);
139+
}
140+
49141
}

activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioPlusSslTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@
1616
*/
1717
package org.apache.activemq.transport.amqp.auto;
1818

19+
import jakarta.jms.Connection;
20+
import jakarta.jms.JMSException;
1921
import java.net.URI;
2022

23+
import org.apache.activemq.transport.amqp.JMSClientContext;
2124
import org.apache.activemq.transport.amqp.JMSClientSslTest;
25+
import org.junit.Test;
2226
import org.slf4j.Logger;
2327
import org.slf4j.LoggerFactory;
2428

@@ -33,6 +37,13 @@ protected URI getBrokerURI() {
3337
return autoNioPlusSslURI;
3438
}
3539

40+
protected Connection createConnection(String clientId, boolean syncPublish) throws JMSException {
41+
Connection connection = JMSClientContext.INSTANCE.createConnection(getBrokerURI(), "admin", "password", clientId, syncPublish,
42+
enabledProtocols);
43+
connection.start();
44+
return connection;
45+
}
46+
3647
@Override
3748
protected boolean isUseTcpConnector() {
3849
return false;
@@ -47,4 +58,15 @@ protected boolean isUseAutoNioPlusSslConnector() {
4758
protected String getTargetConnectorName() {
4859
return "auto+nio+ssl";
4960
}
61+
62+
@Test(timeout=30000)
63+
public void testSslHandshakeRenegotiationTlsv12() throws Exception {
64+
testSslHandshakeRenegotiation("TLSv1.2");
65+
}
66+
67+
@Test(timeout=30000)
68+
public void testSslHandshakeRenegotiationTlsv13() throws Exception {
69+
testSslHandshakeRenegotiation("TLSv1.3");
70+
}
71+
5072
}

activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,24 @@ public void serviceRead() {
171171
if (!plain.hasRemaining()) {
172172
int readCount = secureRead(plain);
173173

174-
if (readCount == 0) {
174+
/*
175+
* 1) If data is read, continue below to the processCommand() call
176+
* and handle processing the data in the buffer. This takes priority
177+
* and some handshake status updates (like NEED_WRAP) can be handled
178+
* concurrently with application data (like TLSv1.3 key updates)
179+
* when the broker sends data to a client.
180+
*
181+
* 2) If no data is read, it's possible that the connection is waiting
182+
* for us to process a handshake update (either KeyUpdate for
183+
* TLS1.3 or renegotiation for TLSv1.2) so we need to check and process
184+
* any handshake updates. If the handshake status was updated,
185+
* we want to continue and loop again to recheck if we can now read new
186+
* application data into the buffer after processing the updates.
187+
*
188+
* 3) If no data is read, and no handshake update is needed, then we
189+
* are finished and can break.
190+
*/
191+
if (readCount == 0 && !handleHandshakeUpdate()) {
175192
break;
176193
}
177194

@@ -184,7 +201,11 @@ public void serviceRead() {
184201
receiveCounter.addAndGet(readCount);
185202
}
186203

187-
if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
204+
// Try and process commands if there is any data in plain if status is OK
205+
// Handshake renegotiation can happen concurrently with application data reads
206+
// so it's possible to have read data that needs processing even if the
207+
// handshake status indicates NEED_UNWRAP
208+
if (status == SSLEngineResult.Status.OK && plain.hasRemaining()) {
188209
processCommand(plain);
189210
//we have received enough bytes to detect the protocol
190211
if (receiveCounter.get() >= 8) {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.util;
18+
19+
import static org.junit.Assert.assertTrue;
20+
21+
import java.lang.reflect.Field;
22+
import javax.net.ssl.SSLEngineResult;
23+
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
24+
import javax.net.ssl.SSLSocket;
25+
import org.apache.activemq.broker.TransportConnection;
26+
import org.apache.activemq.broker.TransportConnector;
27+
import org.apache.activemq.transport.nio.NIOSSLTransport;
28+
29+
public class NioSslTestUtil {
30+
31+
public static void checkHandshakeStatusAdvances(TransportConnector connector, SSLSocket socket) throws Exception {
32+
checkHandshakeStatusAdvances(connector, socket.getLocalPort(), 10000, 10);
33+
}
34+
35+
public static void checkHandshakeStatusAdvances(TransportConnector connector, int localPort) throws Exception {
36+
checkHandshakeStatusAdvances(connector, localPort, 10000, 10);
37+
}
38+
39+
public static void checkHandshakeStatusAdvances(TransportConnector connector, int localPort,
40+
long duration, long sleepMillis) throws Exception {
41+
42+
TransportConnection con = connector.getConnections().stream()
43+
.filter(tc -> tc.getRemoteAddress().contains(
44+
Integer.toString(localPort))).findFirst().orElseThrow();
45+
46+
Field field = NIOSSLTransport.class.getDeclaredField("handshakeStatus");
47+
field.setAccessible(true);
48+
NIOSSLTransport t = con.getTransport().narrow(NIOSSLTransport.class);
49+
// If this is the NIOSSLTransport then verify we exit NEED_WRAP and NEED_TASK
50+
if (t != null) {
51+
assertTrue(Wait.waitFor(() -> {
52+
SSLEngineResult.HandshakeStatus status = (SSLEngineResult.HandshakeStatus) field.get(t);
53+
return status != HandshakeStatus.NEED_WRAP && status != HandshakeStatus.NEED_TASK;
54+
}, duration, sleepMillis));
55+
}
56+
}
57+
}

0 commit comments

Comments
 (0)