Skip to content

Fix for wrong connection count on remote nodes #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
d193e2e
Fix for wrong connection count on remote nodes
HarshDaryani896 Jan 24, 2024
606a049
Made `distanceReporter` protected
HarshDaryani896 Jan 24, 2024
45a1006
mvn format change
HarshDaryani896 Jan 24, 2024
2611026
Taken logs from tmp-logs-ce329
ashetkar Jan 31, 2024
0a518c2
[maven-release-plugin] prepare release 4.15.0-yb-2-TESTFIX.0
HarshDaryani896 Jan 31, 2024
75dfad2
[maven-release-plugin] prepare for next development iteration
HarshDaryani896 Jan 31, 2024
6826741
[maven-release-plugin] rollback the release of 4.15.0-yb-2-TESTFIX.0
HarshDaryani896 Jan 31, 2024
6d7e7bf
[maven-release-plugin] prepare release 4.15.0-yb-2-TESTFIX.0
HarshDaryani896 Jan 31, 2024
6124ec5
[maven-release-plugin] prepare for next development iteration
HarshDaryani896 Jan 31, 2024
6bb0a5d
[maven-release-plugin] rollback the release of 4.15.0-yb-2-TESTFIX.0
HarshDaryani896 Jan 31, 2024
acfc080
[maven-release-plugin] prepare release 4.15.0-yb-2-TESTFIX.0
ashetkar Jan 31, 2024
30229ba
[maven-release-plugin] prepare for next development iteration
ashetkar Jan 31, 2024
e2bdbfb
Revert "[maven-release-plugin] prepare for next development iteration"
HarshDaryani896 Feb 5, 2024
43166d7
Revert "[maven-release-plugin] prepare release 4.15.0-yb-2-TESTFIX.0"
HarshDaryani896 Feb 5, 2024
7aad6ce
Removing unnecessary logs.
HarshDaryani896 Feb 5, 2024
d191ea8
Added logs in PartitionAwarePolicy
HarshDaryani896 Feb 5, 2024
aede876
Finetune log messages
ashetkar Feb 5, 2024
ff205b2
Updated log messages
HarshDaryani896 Feb 5, 2024
fa3d12a
Added logs for metadata refresh
ashetkar Feb 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ private void sendRequest(PrepareRequest request, Node node, int retryCount) {
while (!result.isDone() && (node = queryPlan.poll()) != null) {
channel = session.getChannel(node, logPrefix);
if (channel != null) {
if (LOG.isTraceEnabled()) {
LOG.trace(
"[{}] Polled node {} from queryPlan with hashCode = {}",
logPrefix,
node.getEndPoint(),
System.identityHashCode(queryPlan));
}
break;
} else {
recordError(node, new NodeUnavailableException(node));
Expand Down Expand Up @@ -363,7 +370,13 @@ public void operationComplete(Future<java.lang.Void> future) {
// Might happen if the timeout just fired
cancel();
} else {
LOG.trace("[{}] Request sent to {}", logPrefix, node);
if (LOG.isDebugEnabled()) {
LOG.debug(
"[{}] Request sent to {} for a queryPlan with hashCode = {}",
logPrefix,
node.getEndPoint(),
System.identityHashCode(queryPlan));
}
initialCallback = this;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,13 @@ private void sendRequest(
while (!result.isDone() && (node = queryPlan.poll()) != null) {
channel = session.getChannel(node, logPrefix);
if (channel != null) {
if (LOG.isTraceEnabled()) {
LOG.trace(
"[{}] Polled node {} from queryPlan with hashCode = {}",
logPrefix,
node.getEndPoint(),
System.identityHashCode(queryPlan));
}
break;
} else {
recordError(node, new NodeUnavailableException(node));
Expand Down Expand Up @@ -535,7 +542,13 @@ public void operationComplete(Future<java.lang.Void> future) throws Exception {
scheduleNextExecution); // try next node
}
} else {
LOG.trace("[{}] Request sent on {}", logPrefix, channel);
if (LOG.isDebugEnabled()) {
LOG.debug(
"[{}] Request sent on {} for a queryPlan with hashCode = {}",
logPrefix,
channel.getEndPoint(),
System.identityHashCode(queryPlan));
}
if (result.isDone()) {
// If the handler completed since the last time we checked, cancel directly because we
// don't know if cancelScheduledTasks() has run yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,11 @@ public class BasicLoadBalancingPolicy implements LoadBalancingPolicy {
private final ConsistencyLevel defaultConsistencyLevel;

// private because they should be set in init() and never be modified after
private volatile DistanceReporter distanceReporter;
// Yugabyte specific: protected because distanceReporter is to be used in
// YugabyteDefaultLoadBalancingPolicy class
protected volatile DistanceReporter distanceReporter;
protected volatile NodeDistanceEvaluator nodeDistanceEvaluator;
private volatile String localDc;
protected volatile String localDc;
protected volatile NodeSet liveNodes;

public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
Expand Down Expand Up @@ -140,6 +141,55 @@ private DefaultSession(InternalDriverContext context, Set<EndPoint> contactPoint
}
throw t;
}
if (LOG.isInfoEnabled()) {
if (context.getConfig() != null) {
Map<String, ?> profiles = context.getConfig().getProfiles();
if (profiles != null) {
for (Map.Entry e : profiles.entrySet()) {
DriverExecutionProfile dep = (DriverExecutionProfile) e.getValue();
String cl =
dep.isDefined(DefaultDriverOption.REQUEST_CONSISTENCY)
? dep.getString(DefaultDriverOption.REQUEST_CONSISTENCY)
: "UNDEFINED";
String localDC =
dep.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER)
? dep.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER)
: "UNDEFINED";
String lbClass =
dep.isDefined(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS)
? dep.getString(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS)
: "UNDEFINED";
String rpClass =
dep.isDefined(DefaultDriverOption.RETRY_POLICY_CLASS)
? dep.getString(DefaultDriverOption.RETRY_POLICY_CLASS)
: "UNDEFINED";
String localDCViaAPI =
context.getLocalDatacenter((String) e.getKey()) != null
? context.getLocalDatacenter((String) e.getKey())
: "UNDEFINED";
String remotePoolSize =
dep.isDefined(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE)
? String.valueOf(dep.getInt(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE))
: "UNDEFINED";
String localPoolSize =
dep.isDefined(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE)
? String.valueOf(dep.getInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE))
: "UNDEFINED";
LOG.info(
"Driver Setting for profile {}: global CL = {}, localDC (conf) = {}, localDC (API) = {}, LB class = {},"
+ " RetryPolicy class = {}, remote pool size = {}, local pool size = {}",
e.getKey(),
cl,
localDC,
localDCViaAPI,
lbClass,
rpClass,
remotePoolSize,
localPoolSize);
}
}
}
}
}

private CompletionStage<CqlSession> init(CqlIdentifier keyspace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ private Optional<Map<QualifiedTableName, TableSplitMetadata>> createPartitionMap

Map<QualifiedTableName, TableSplitMetadata> tableSplits =
new HashMap<QualifiedTableName, TableSplitMetadata>();
StringBuilder msg = new StringBuilder();

for (AdminRow row : result) {

Expand All @@ -156,6 +157,7 @@ private Optional<Map<QualifiedTableName, TableSplitMetadata>> createPartitionMap
Map<InetAddress, String> replicaAddresses =
row.getMapOfInetAddressToString("replica_addresses");

boolean leaderFound = false;
List<Node> hosts = new ArrayList<Node>();
for (Map.Entry<InetAddress, String> entry : replicaAddresses.entrySet()) {

Expand All @@ -175,17 +177,23 @@ private Optional<Map<QualifiedTableName, TableSplitMetadata>> createPartitionMap
// Put the leader at the beginning and the rest after.
String role = entry.getValue();
if (role.equals("LEADER")) {
leaderFound = true;
hosts.add(0, host);
} else if (role.equals("FOLLOWER") || role.equals("READ_REPLICA")) {
hosts.add(host);
}
}
int startKey = getKey(row.getByteBuffer("start_key"));
int endKey = getKey(row.getByteBuffer("end_key"));
if (!leaderFound && LOG.isDebugEnabled()) {
msg.append(
tableId.getKeyspaceName() + "." + tableId.getTableName() + ": " + startKey + ", ");
}
tableSplitMetadata
.getPartitionMap()
.put(startKey, new PartitionMetadata(startKey, endKey, hosts));
}
LOG.debug("Created partition map. Tablets without leaders: {}", msg);
return Optional.ofNullable(tableSplits);
}

Expand Down
Loading