Skip to content

Fix for wrong connection count on remote nodes #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
d193e2e
Fix for wrong connection count on remote nodes
HarshDaryani896 Jan 24, 2024
606a049
Made `distanceReporter` protected
HarshDaryani896 Jan 24, 2024
45a1006
mvn format change
HarshDaryani896 Jan 24, 2024
2611026
Taken logs from tmp-logs-ce329
ashetkar Jan 31, 2024
0a518c2
[maven-release-plugin] prepare release 4.15.0-yb-2-TESTFIX.0
HarshDaryani896 Jan 31, 2024
75dfad2
[maven-release-plugin] prepare for next development iteration
HarshDaryani896 Jan 31, 2024
6826741
[maven-release-plugin] rollback the release of 4.15.0-yb-2-TESTFIX.0
HarshDaryani896 Jan 31, 2024
6d7e7bf
[maven-release-plugin] prepare release 4.15.0-yb-2-TESTFIX.0
HarshDaryani896 Jan 31, 2024
6124ec5
[maven-release-plugin] prepare for next development iteration
HarshDaryani896 Jan 31, 2024
6bb0a5d
[maven-release-plugin] rollback the release of 4.15.0-yb-2-TESTFIX.0
HarshDaryani896 Jan 31, 2024
acfc080
[maven-release-plugin] prepare release 4.15.0-yb-2-TESTFIX.0
ashetkar Jan 31, 2024
30229ba
[maven-release-plugin] prepare for next development iteration
ashetkar Jan 31, 2024
e2bdbfb
Revert "[maven-release-plugin] prepare for next development iteration"
HarshDaryani896 Feb 5, 2024
43166d7
Revert "[maven-release-plugin] prepare release 4.15.0-yb-2-TESTFIX.0"
HarshDaryani896 Feb 5, 2024
7aad6ce
Removing unnecessary logs.
HarshDaryani896 Feb 5, 2024
d191ea8
Added logs in PartitionAwarePolicy
HarshDaryani896 Feb 5, 2024
aede876
Finetune log messages
ashetkar Feb 5, 2024
ff205b2
Updated log messages
HarshDaryani896 Feb 5, 2024
fa3d12a
Added logs for metadata refresh
ashetkar Feb 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ private void sendRequest(PrepareRequest request, Node node, int retryCount) {
while (!result.isDone() && (node = queryPlan.poll()) != null) {
channel = session.getChannel(node, logPrefix);
if (channel != null) {
if (LOG.isTraceEnabled()) {
LOG.trace(
"[{}] Polled node {} from queryPlan with hashCode = {}",
logPrefix,
node.getEndPoint(),
System.identityHashCode(queryPlan));
}
break;
} else {
recordError(node, new NodeUnavailableException(node));
Expand Down Expand Up @@ -363,7 +370,13 @@ public void operationComplete(Future<java.lang.Void> future) {
// Might happen if the timeout just fired
cancel();
} else {
LOG.trace("[{}] Request sent to {}", logPrefix, node);
if (LOG.isDebugEnabled()) {
LOG.debug(
"[{}] Request sent to {} for a queryPlan with hashCode = {}",
logPrefix,
node.getEndPoint(),
System.identityHashCode(queryPlan));
}
initialCallback = this;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,13 @@ private void sendRequest(
while (!result.isDone() && (node = queryPlan.poll()) != null) {
channel = session.getChannel(node, logPrefix);
if (channel != null) {
if (LOG.isTraceEnabled()) {
LOG.trace(
"[{}] Polled node {} from queryPlan with hashCode = {}",
logPrefix,
node.getEndPoint(),
System.identityHashCode(queryPlan));
}
break;
} else {
recordError(node, new NodeUnavailableException(node));
Expand Down Expand Up @@ -535,7 +542,13 @@ public void operationComplete(Future<java.lang.Void> future) throws Exception {
scheduleNextExecution); // try next node
}
} else {
LOG.trace("[{}] Request sent on {}", logPrefix, channel);
if (LOG.isDebugEnabled()) {
LOG.debug(
"[{}] Request sent on {} for a queryPlan with hashCode = {}",
logPrefix,
channel.getEndPoint(),
System.identityHashCode(queryPlan));
}
if (result.isDone()) {
// If the handler completed since the last time we checked, cancel directly because we
// don't know if cancelScheduledTasks() has run yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,11 @@ public class BasicLoadBalancingPolicy implements LoadBalancingPolicy {
private final ConsistencyLevel defaultConsistencyLevel;

// private because they should be set in init() and never be modified after
private volatile DistanceReporter distanceReporter;
// Yugabyte specific: protected because distanceReporter is to be used in
// YugabyteDefaultLoadBalancingPolicy class
protected volatile DistanceReporter distanceReporter;
protected volatile NodeDistanceEvaluator nodeDistanceEvaluator;
private volatile String localDc;
protected volatile String localDc;
protected volatile NodeSet liveNodes;

public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
Expand Down Expand Up @@ -140,6 +141,55 @@ private DefaultSession(InternalDriverContext context, Set<EndPoint> contactPoint
}
throw t;
}
if (LOG.isInfoEnabled()) {
if (context.getConfig() != null) {
Map<String, ?> profiles = context.getConfig().getProfiles();
if (profiles != null) {
for (Map.Entry e : profiles.entrySet()) {
DriverExecutionProfile dep = (DriverExecutionProfile) e.getValue();
String cl =
dep.isDefined(DefaultDriverOption.REQUEST_CONSISTENCY)
? dep.getString(DefaultDriverOption.REQUEST_CONSISTENCY)
: "UNDEFINED";
String localDC =
dep.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER)
? dep.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER)
: "UNDEFINED";
String lbClass =
dep.isDefined(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS)
? dep.getString(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS)
: "UNDEFINED";
String rpClass =
dep.isDefined(DefaultDriverOption.RETRY_POLICY_CLASS)
? dep.getString(DefaultDriverOption.RETRY_POLICY_CLASS)
: "UNDEFINED";
String localDCViaAPI =
context.getLocalDatacenter((String) e.getKey()) != null
? context.getLocalDatacenter((String) e.getKey())
: "UNDEFINED";
String remotePoolSize =
dep.isDefined(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE)
? String.valueOf(dep.getInt(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE))
: "UNDEFINED";
String localPoolSize =
dep.isDefined(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE)
? String.valueOf(dep.getInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE))
: "UNDEFINED";
LOG.info(
"Driver Setting for profile {}: global CL = {}, localDC (conf) = {}, localDC (API) = {}, LB class = {},"
+ " RetryPolicy class = {}, remote pool size = {}, local pool size = {}",
e.getKey(),
cl,
localDC,
localDCViaAPI,
lbClass,
rpClass,
remotePoolSize,
localPoolSize);
}
}
}
}
}

private CompletionStage<CqlSession> init(CqlIdentifier keyspace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BatchableStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeState;
Expand All @@ -37,15 +33,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.*;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -68,28 +56,67 @@ public void init(Map<UUID, Node> nodes, DistanceReporter distanceReporter) {
@Override
public Queue<Node> newQueryPlan(Request request, Session session) {

StringBuilder msg = null;
Iterator<Node> partitionAwareNodeIterator = null;
if (request instanceof BoundStatement) {
partitionAwareNodeIterator = getQueryPlan(session, (BoundStatement) request);
} else if (request instanceof BatchStatement) {
partitionAwareNodeIterator = getQueryPlan(session, (BatchStatement) request);
}
if (LOG.isDebugEnabled()) {
msg = new StringBuilder("Stmt: ");
msg.append(request.getClass().getSimpleName());
if (request instanceof Statement) {
msg.append(", CL: " + ((Statement) request).getConsistencyLevel());
}
}

LinkedHashSet<Node> partitionAwareNodes = null;
if (partitionAwareNodeIterator != null) {
partitionAwareNodes = new LinkedHashSet<>();
while (partitionAwareNodeIterator.hasNext()) {
partitionAwareNodes.add(partitionAwareNodeIterator.next());
}
LOG.debug("newQueryPlan: Number of Nodes = " + partitionAwareNodes.size());
}

// It so happens that the partition aware nodes could be non-empty, but the state of the nodes
// could be down.
// In such cases fallback to the inherited load-balancing logic
return !(partitionAwareNodes == null || partitionAwareNodes.isEmpty())
? new SimpleQueryPlan(partitionAwareNodes.toArray())
: super.newQueryPlan(request, session);
Queue<Node> plan;
if (partitionAwareNodes == null || partitionAwareNodes.isEmpty()) {
plan = super.newQueryPlan(request, session);
if (LOG.isDebugEnabled()) {
msg.append(", LB: YugabyteDefault");
}
} else {
plan = new SimpleQueryPlan(partitionAwareNodes.toArray());
if (LOG.isDebugEnabled()) {
msg.append(", LB: PartitionAware");
}
}
if (LOG.isDebugEnabled()) {
msg.append(
", first nodes: "
+ getInitialNodes(plan)
+ ", plan-hash: "
+ System.identityHashCode(plan));
LOG.debug(msg.toString());
}
return plan;
}

private String getInitialNodes(Queue<Node> plan) {
Object[] nodes = plan.toArray();
String nodeInfo = "";
if (nodes != null) {
if (nodes.length > 0) {
nodeInfo = ((Node) nodes[0]).getEndPoint().toString();
}
if (nodes.length > 1) {
nodeInfo += ", " + ((Node) nodes[1]).getEndPoint();
}
}
return nodeInfo;
}

/**
Expand All @@ -105,23 +132,26 @@ private Iterator<Node> getQueryPlan(Session session, BoundStatement statement) {
ColumnDefinitions variables = pstmt.getVariableDefinitions();
// Look up the hosts for the partition key. Skip statements that do not have
// bind variables.
if (variables.size() == 0) return null;
if (variables.size() == 0) {
LOG.debug("getQueryPlan(): variables.size=0 for {}", query);
return null;
}
int key = getKey(statement);
if (key < 0) return null;
String queryKeySpace = variables.get(0).getKeyspace().asInternal();
String queryTable = variables.get(0).getTable().asInternal();

LOG.debug("getQueryPlan: keyspace = " + queryKeySpace + ", query = " + query);

Optional<DefaultPartitionMetadata> partitionMetadata =
session.getMetadata().getDefaultPartitionMetadata();
if (!partitionMetadata.isPresent()) {
LOG.debug("getQueryPlan(): partitionMetadata not present for {}", query);
return null;
}

TableSplitMetadata tableSplitMetadata =
partitionMetadata.get().getTableSplitMetadata(queryKeySpace, queryTable);
if (tableSplitMetadata == null) {
LOG.debug("getQueryPlan(): tableSplitMetadata=null for {}", query);
return null;
}

Expand All @@ -130,8 +160,13 @@ private Iterator<Node> getQueryPlan(Session session, BoundStatement statement) {
super.newQueryPlan((Request) statement, session).iterator();

// This needs to manipulate the local copy of the hosts instead of the actual reference
return new UpHostIterator(
statement, new ArrayList(tableSplitMetadata.getHosts(key)), nodesFromBasePolicy);
List<Node> nodes = tableSplitMetadata.getHosts(key);
if (nodes.isEmpty()) {
LOG.debug(
"getQueryPlan(): tableSplitMetadata.getHosts(key) is empty for query {}",
pstmt.getQuery());
}
return new UpHostIterator(statement, new ArrayList(nodes), nodesFromBasePolicy);
}

/**
Expand All @@ -151,6 +186,7 @@ private Iterator<Node> getQueryPlan(Session session, BatchStatement batch) {
if (plan != null) return plan;
}
}
LOG.debug("getQueryPlan(BatchStatement): Returning null");
return null;
}

Expand Down Expand Up @@ -186,6 +222,9 @@ public UpHostIterator(
// shuffled.
if (getConsistencyLevel() == ConsistencyLevel.YB_CONSISTENT_PREFIX) {
// this is to be performed in the local copy
PreparedStatement ps = statement.getPreparedStatement();
String q = ps == null ? "" : ps.getQuery();
LOG.trace("Shuffling the nodes since CL is YB_CONSISTENT_PREFIX for query = {}", q);
Collections.shuffle(hosts);
}
}
Expand Down Expand Up @@ -290,6 +329,9 @@ public static int getKey(BoundStatement stmt) {
// are literal
// constants.
if (hashIndexes == null || hashIndexes.isEmpty()) {
LOG.info(
"getKey(): Returning negative hash (-1) PartitionKeyIndices are null or empty for {}",
pstmt.getQuery());
return -1;
}

Expand All @@ -306,13 +348,17 @@ public static int getKey(BoundStatement stmt) {
}
channel.close();

return getKey(bs.toByteArray());
int returnValue = getKey(bs.toByteArray());
if (returnValue < 0) {
LOG.info("getKey(): Returning negative hash {} for {}", returnValue, pstmt.getQuery());
}
return returnValue;
} catch (IOException e) {
// IOException should not happen at all given we are writing to the in-memory
// buffer only. So
// if it does happen, we just want to log the error but fallback to the default
// set of hosts.
LOG.error("hash key encoding failed", e);
LOG.error("getKey(): Returning negative hash. hash key encoding failed", e);
return -1;
}
}
Expand Down Expand Up @@ -382,20 +428,21 @@ private static void AppendValueToChannel(
channel.write(value);
break;
}
case ProtocolConstants.DataType.LIST:{
ListType listType = (ListType) type;
DataType dataTypeOfListValue = listType.getElementType();
int length = value.getInt();
for (int j = 0; j < length; j++) {
// Appending each element.
int size = value.getInt();
ByteBuffer buf = value.slice();
buf.limit(size);
AppendValueToChannel(dataTypeOfListValue, buf, channel);
value.position(value.position() + size);
case ProtocolConstants.DataType.LIST:
{
ListType listType = (ListType) type;
DataType dataTypeOfListValue = listType.getElementType();
int length = value.getInt();
for (int j = 0; j < length; j++) {
// Appending each element.
int size = value.getInt();
ByteBuffer buf = value.slice();
buf.limit(size);
AppendValueToChannel(dataTypeOfListValue, buf, channel);
value.position(value.position() + size);
}
break;
}
break;
}
case ProtocolConstants.DataType.SET:
{
SetType setType = (SetType) type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ public class YugabyteDefaultLoadBalancingPolicy extends BasicLoadBalancingPolicy
private static final Logger LOG =
LoggerFactory.getLogger(YugabyteDefaultLoadBalancingPolicy.class);

private volatile DistanceReporter distanceReporter;
private volatile String localDc;

protected final CopyOnWriteArraySet<Node> liveNodesInLocalDc = new CopyOnWriteArraySet<>();
protected final CopyOnWriteArraySet<Node> liveNodesInAllDC = new CopyOnWriteArraySet<>();
protected final Map<Node, AtomicLongArray> responseTimes = new ConcurrentHashMap<>();
Expand Down