Skip to content

Commit 0a7b1c8

Browse files
authored
JAVA-2959: Don't throw NoNodeAvailableException when all connections busy (#1570)
For cases in which there are no connections available to send requests to a Node in the query plan, collect the error rather than silently skipping over the node. The error will be thrown as part of an AllNodesFailedException if all nodes fail. This can happen when we've saturated the max in-flight requests across all nodes or when the request is directed to a particular node and it has no connections available (or all its connections are saturated). Note that in the latter case we used to throw a NoNodeAvailableException but we now throw AllNodesFailedException.
1 parent 2a1e37a commit 0a7b1c8

File tree

10 files changed

+178
-6
lines changed

10 files changed

+178
-6
lines changed

changelog/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
<!-- Note: contrary to 3.x, insert new entries *first* in their section -->
44

5+
### 4.14.0 (in progress)
6+
7+
- [improvement] JAVA-2959: Don't throw NoNodeAvailableException when all connections busy
8+
59
### 4.13.0
610

711
- [improvement] JAVA-2940: Add GraalVM native image build configurations

core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.datastax.oss.driver.api.core.AllNodesFailedException;
2525
import com.datastax.oss.driver.api.core.CqlIdentifier;
2626
import com.datastax.oss.driver.api.core.DriverTimeoutException;
27+
import com.datastax.oss.driver.api.core.NodeUnavailableException;
2728
import com.datastax.oss.driver.api.core.ProtocolVersion;
2829
import com.datastax.oss.driver.api.core.RequestThrottlingException;
2930
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
@@ -348,6 +349,8 @@ private void sendRequest(
348349
channel = session.getChannel(node, logPrefix);
349350
if (channel != null) {
350351
break;
352+
} else {
353+
recordError(node, new NodeUnavailableException(node));
351354
}
352355
}
353356
}
@@ -455,6 +458,10 @@ CompletableFuture<ResultSetT> getPendingResult() {
455458
}
456459
}
457460

461+
private void recordError(@NonNull Node node, @NonNull Throwable error) {
462+
errors.add(new AbstractMap.SimpleEntry<>(node, error));
463+
}
464+
458465
/**
459466
* Handles the interaction with a single node in the query plan.
460467
*
@@ -1433,10 +1440,6 @@ private void reenableAutoReadIfNeeded() {
14331440

14341441
// ERROR HANDLING
14351442

1436-
private void recordError(@NonNull Node node, @NonNull Throwable error) {
1437-
errors.add(new AbstractMap.SimpleEntry<>(node, error));
1438-
}
1439-
14401443
private void trackNodeError(@NonNull Node node, @NonNull Throwable error) {
14411444
if (nodeErrorReported.compareAndSet(false, true)) {
14421445
long latencyNanos = System.nanoTime() - this.messageStartTimeNanos;

core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.datastax.oss.driver.api.core.AllNodesFailedException;
2525
import com.datastax.oss.driver.api.core.DriverException;
2626
import com.datastax.oss.driver.api.core.DriverTimeoutException;
27+
import com.datastax.oss.driver.api.core.NodeUnavailableException;
2728
import com.datastax.oss.driver.api.core.RequestThrottlingException;
2829
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
2930
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
@@ -253,6 +254,8 @@ private void sendRequest(
253254
channel = session.getChannel(node, logPrefix);
254255
if (channel != null) {
255256
break;
257+
} else {
258+
recordError(node, new NodeUnavailableException(node));
256259
}
257260
}
258261
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.api.core;
17+
18+
import com.datastax.oss.driver.api.core.metadata.Node;
19+
import edu.umd.cs.findbugs.annotations.NonNull;
20+
import java.util.Objects;
21+
22+
/**
23+
* Indicates that a {@link Node} was selected in a query plan, but it had no connection available.
24+
*
25+
* <p>A common reason to encounter this error is when the configured number of connections per node
26+
* and requests per connection is not high enough to absorb the overall request rate. This can be
27+
* mitigated by tuning the following options:
28+
*
29+
* <ul>
30+
* <li>{@code advanced.connection.pool.local.size};
31+
* <li>{@code advanced.connection.pool.remote.size};
32+
* <li>{@code advanced.connection.max-requests-per-connection}.
33+
* </ul>
34+
*
35+
* See {@code reference.conf} for more details.
36+
*
37+
* <p>Another possibility is when you are trying to direct a request {@linkplain
38+
* com.datastax.oss.driver.api.core.cql.Statement#setNode(Node) to a particular node}, but that node
39+
* has no connections available.
40+
*/
41+
public class NodeUnavailableException extends DriverException {
42+
43+
private final Node node;
44+
45+
public NodeUnavailableException(Node node) {
46+
super("No connection was available to " + node, null, null, true);
47+
this.node = Objects.requireNonNull(node);
48+
}
49+
50+
@NonNull
51+
public Node getNode() {
52+
return node;
53+
}
54+
55+
@Override
56+
@NonNull
57+
public DriverException copy() {
58+
return new NodeUnavailableException(node);
59+
}
60+
}

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.datastax.oss.driver.api.core.AllNodesFailedException;
1919
import com.datastax.oss.driver.api.core.CqlIdentifier;
2020
import com.datastax.oss.driver.api.core.DriverTimeoutException;
21+
import com.datastax.oss.driver.api.core.NodeUnavailableException;
2122
import com.datastax.oss.driver.api.core.ProtocolVersion;
2223
import com.datastax.oss.driver.api.core.RequestThrottlingException;
2324
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
@@ -188,6 +189,8 @@ private void sendRequest(PrepareRequest request, Node node, int retryCount) {
188189
channel = session.getChannel(node, logPrefix);
189190
if (channel != null) {
190191
break;
192+
} else {
193+
recordError(node, new NodeUnavailableException(node));
191194
}
192195
}
193196
}

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.datastax.oss.driver.api.core.CqlIdentifier;
2020
import com.datastax.oss.driver.api.core.DriverException;
2121
import com.datastax.oss.driver.api.core.DriverTimeoutException;
22+
import com.datastax.oss.driver.api.core.NodeUnavailableException;
2223
import com.datastax.oss.driver.api.core.RequestThrottlingException;
2324
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
2425
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
@@ -251,6 +252,8 @@ private void sendRequest(
251252
channel = session.getChannel(node, logPrefix);
252253
if (channel != null) {
253254
break;
255+
} else {
256+
recordError(node, new NodeUnavailableException(node));
254257
}
255258
}
256259
}

core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerNodeTargetingTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,17 @@
2525
import com.datastax.dse.driver.DseTestFixtures;
2626
import com.datastax.dse.driver.api.core.DseProtocolVersion;
2727
import com.datastax.dse.driver.api.core.cql.continuous.ContinuousAsyncResultSet;
28-
import com.datastax.oss.driver.api.core.NoNodeAvailableException;
28+
import com.datastax.oss.driver.api.core.AllNodesFailedException;
29+
import com.datastax.oss.driver.api.core.NodeUnavailableException;
2930
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
31+
import com.datastax.oss.driver.api.core.metadata.Node;
3032
import com.datastax.oss.driver.api.core.session.Request;
3133
import com.datastax.oss.driver.api.core.session.Session;
3234
import com.datastax.oss.driver.internal.core.cql.RequestHandlerTestHarness;
3335
import com.datastax.oss.driver.internal.core.metadata.LoadBalancingPolicyWrapper;
3436
import com.tngtech.java.junit.dataprovider.UseDataProvider;
37+
import java.util.List;
38+
import java.util.Map;
3539
import java.util.concurrent.CompletionStage;
3640
import org.junit.Test;
3741
import org.mockito.InOrder;
@@ -67,7 +71,12 @@ public void should_fail_if_targeted_node_not_available(DseProtocolVersion versio
6771
assertThatStage(resultSetFuture)
6872
.isFailed(
6973
error -> {
70-
assertThat(error).isInstanceOf(NoNodeAvailableException.class);
74+
assertThat(error).isInstanceOf(AllNodesFailedException.class);
75+
Map<Node, List<Throwable>> errors =
76+
((AllNodesFailedException) error).getAllErrors();
77+
assertThat(errors).hasSize(1);
78+
List<Throwable> nodeErrors = errors.values().iterator().next();
79+
assertThat(nodeErrors).singleElement().isInstanceOf(NodeUnavailableException.class);
7180
invocations
7281
.verify(loadBalancingPolicy, never())
7382
.newQueryPlan(any(Request.class), anyString(), any(Session.class));

core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandlerTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import static org.mockito.Mockito.verify;
2626
import static org.mockito.Mockito.when;
2727

28+
import com.datastax.oss.driver.api.core.AllNodesFailedException;
29+
import com.datastax.oss.driver.api.core.NodeUnavailableException;
2830
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
2931
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
3032
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
@@ -47,6 +49,7 @@
4749
import com.datastax.oss.protocol.internal.response.result.RowsMetadata;
4850
import com.datastax.oss.protocol.internal.util.Bytes;
4951
import java.nio.ByteBuffer;
52+
import java.util.List;
5053
import java.util.Map;
5154
import java.util.concurrent.CompletionStage;
5255
import org.junit.Before;
@@ -228,6 +231,39 @@ public void should_not_retry_initial_prepare_if_unrecoverable_error() {
228231
}
229232
}
230233

234+
@Test
235+
public void should_fail_if_nodes_unavailable() {
236+
RequestHandlerTestHarness.Builder harnessBuilder = RequestHandlerTestHarness.builder();
237+
try (RequestHandlerTestHarness harness =
238+
harnessBuilder.withEmptyPool(node1).withEmptyPool(node2).build()) {
239+
CompletionStage<PreparedStatement> prepareFuture =
240+
new CqlPrepareHandler(PREPARE_REQUEST, harness.getSession(), harness.getContext(), "test")
241+
.handle();
242+
assertThatStage(prepareFuture)
243+
.isFailed(
244+
error -> {
245+
assertThat(error).isInstanceOf(AllNodesFailedException.class);
246+
Map<Node, List<Throwable>> allErrors =
247+
((AllNodesFailedException) error).getAllErrors();
248+
assertThat(allErrors).hasSize(2);
249+
assertThat(allErrors)
250+
.hasEntrySatisfying(
251+
node1,
252+
nodeErrors ->
253+
assertThat(nodeErrors)
254+
.singleElement()
255+
.isInstanceOf(NodeUnavailableException.class));
256+
assertThat(allErrors)
257+
.hasEntrySatisfying(
258+
node2,
259+
nodeErrors ->
260+
assertThat(nodeErrors)
261+
.singleElement()
262+
.isInstanceOf(NodeUnavailableException.class));
263+
});
264+
}
265+
}
266+
231267
@Test
232268
public void should_fail_if_retry_policy_ignores_error() {
233269
RequestHandlerTestHarness.Builder harnessBuilder =

core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
import static org.mockito.Mockito.verify;
2222
import static org.mockito.Mockito.when;
2323

24+
import com.datastax.oss.driver.api.core.AllNodesFailedException;
2425
import com.datastax.oss.driver.api.core.CqlIdentifier;
2526
import com.datastax.oss.driver.api.core.DriverTimeoutException;
2627
import com.datastax.oss.driver.api.core.NoNodeAvailableException;
28+
import com.datastax.oss.driver.api.core.NodeUnavailableException;
2729
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
2830
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
2931
import com.datastax.oss.driver.api.core.cql.BoundStatement;
@@ -32,6 +34,7 @@
3234
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
3335
import com.datastax.oss.driver.api.core.cql.Row;
3436
import com.datastax.oss.driver.api.core.cql.Statement;
37+
import com.datastax.oss.driver.api.core.metadata.Node;
3538
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
3639
import com.datastax.oss.driver.internal.core.util.concurrent.CapturingTimer.CapturedTimeout;
3740
import com.datastax.oss.protocol.internal.request.Prepare;
@@ -43,6 +46,8 @@
4346
import java.time.Duration;
4447
import java.util.Collections;
4548
import java.util.Iterator;
49+
import java.util.List;
50+
import java.util.Map;
4651
import java.util.concurrent.CompletionStage;
4752
import java.util.concurrent.ConcurrentHashMap;
4853
import java.util.concurrent.ConcurrentMap;
@@ -105,6 +110,43 @@ public void should_fail_if_no_node_available() {
105110
}
106111
}
107112

113+
@Test
114+
public void should_fail_if_nodes_unavailable() {
115+
RequestHandlerTestHarness.Builder harnessBuilder = RequestHandlerTestHarness.builder();
116+
try (RequestHandlerTestHarness harness =
117+
harnessBuilder.withEmptyPool(node1).withEmptyPool(node2).build()) {
118+
CompletionStage<AsyncResultSet> resultSetFuture =
119+
new CqlRequestHandler(
120+
UNDEFINED_IDEMPOTENCE_STATEMENT,
121+
harness.getSession(),
122+
harness.getContext(),
123+
"test")
124+
.handle();
125+
assertThatStage(resultSetFuture)
126+
.isFailed(
127+
error -> {
128+
assertThat(error).isInstanceOf(AllNodesFailedException.class);
129+
Map<Node, List<Throwable>> allErrors =
130+
((AllNodesFailedException) error).getAllErrors();
131+
assertThat(allErrors).hasSize(2);
132+
assertThat(allErrors)
133+
.hasEntrySatisfying(
134+
node1,
135+
nodeErrors ->
136+
assertThat(nodeErrors)
137+
.singleElement()
138+
.isInstanceOf(NodeUnavailableException.class));
139+
assertThat(allErrors)
140+
.hasEntrySatisfying(
141+
node2,
142+
nodeErrors ->
143+
assertThat(nodeErrors)
144+
.singleElement()
145+
.isInstanceOf(NodeUnavailableException.class));
146+
});
147+
}
148+
}
149+
108150
@Test
109151
public void should_time_out_if_first_node_takes_too_long_to_respond() throws Exception {
110152
RequestHandlerTestHarness.Builder harnessBuilder = RequestHandlerTestHarness.builder();

upgrade_guide/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
## Upgrade guide
22

3+
### 4.14.0
4+
5+
#### AllNodesFailedException instead of NoNodeAvailableException in certain cases
6+
7+
[JAVA-2959](https://datastax-oss.atlassian.net/browse/JAVA-2959) changed the behavior for when a
8+
request cannot be executed because all nodes tried were busy. Previously you would get back a
9+
`NoNodeAvailableException` but you will now get back an `AllNodesFailedException` where the
10+
`getAllErrors` map contains a `NodeUnavailableException` for that node.
11+
312
### 4.13.0
413

514
#### Enhanced support for GraalVM native images

0 commit comments

Comments
 (0)