From a953a09ddc9729142490670641c4f058c806d874 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Mon, 17 Feb 2025 01:31:01 +0530 Subject: [PATCH 1/7] Add query wait time and query end to end task time metrics --- .../phoenix/iterate/BaseResultIterators.java | 16 +- .../iterate/RoundRobinResultIterator.java | 37 ++- .../org/apache/phoenix/job/JobManager.java | 32 +++ .../apache/phoenix/monitoring/MetricType.java | 5 + .../monitoring/OverAllQueryMetrics.java | 24 ++ .../phoenix/monitoring/PhoenixMetricsIT.java | 237 +++++++++++++++++- .../org/apache/phoenix/query/BaseTest.java | 14 ++ 7 files changed, 352 insertions(+), 13 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 30bc118da91..66baf4a7e8c 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -85,6 +85,7 @@ import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.VersionUtil; +import org.apache.phoenix.job.JobManager; import org.apache.phoenix.join.HashCacheClient; import org.apache.phoenix.monitoring.OverAllQueryMetrics; import org.apache.phoenix.parse.FilterableStatement; @@ -111,7 +112,6 @@ import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; import org.apache.phoenix.schema.stats.StatisticsUtil; -import org.apache.phoenix.schema.types.PVarbinaryEncoded; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ClientUtil; @@ -1439,6 +1439,8 @@ private List getIterators(List> scan, Connecti SQLException toThrow = null; final HashCacheClient hashCacheClient = new HashCacheClient(context.getConnection()); int queryTimeOut = context.getStatement().getQueryTimeoutInMillis(); + long maxTaskQueueWaitTime = 0; + long maxTaskEndToEndTime = 0; try { submitWork(scan, futures, allIterators, splitSize, isReverse, scanGrouper, maxQueryEndTime); boolean clearedCache = false; @@ -1467,7 +1469,13 @@ private List getIterators(List> scan, Connecti && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_START_ROW_SUFFIX), previousScan.getScan().getAttribute(SCAN_START_ROW_SUFFIX))==0)) { continue; } - PeekingResultIterator iterator = scanPair.getSecond().get(timeOutForScan, TimeUnit.MILLISECONDS); + Future futureTask = scanPair.getSecond(); + PeekingResultIterator iterator = futureTask.get(timeOutForScan, + TimeUnit.MILLISECONDS); + long taskQueueWaitTime = JobManager.getTaskQueueWaitTime(futureTask); + long taskEndToEndTime = JobManager.getTaskEndToEndTime(futureTask); + maxTaskQueueWaitTime = Math.max(maxTaskQueueWaitTime, taskQueueWaitTime); + maxTaskEndToEndTime = Math.max(maxTaskEndToEndTime, taskEndToEndTime); concatIterators.add(iterator); previousScan.setScan(scanPair.getFirst()); } catch (ExecutionException e) { @@ -1571,9 +1579,11 @@ private List getIterators(List> scan, Connecti } } } finally { + OverAllQueryMetrics overAllQueryMetrics = context.getOverallQueryMetrics(); + overAllQueryMetrics.updateQueryWaitTime(maxTaskQueueWaitTime); + overAllQueryMetrics.updateQueryTaskEndToEndTime(maxTaskEndToEndTime); if (toThrow != null) { GLOBAL_FAILED_QUERY_COUNTER.increment(); - OverAllQueryMetrics overAllQueryMetrics = context.getOverallQueryMetrics(); overAllQueryMetrics.queryFailed(); if (context.getScanRanges().isPointLookup()) { overAllQueryMetrics.queryPointLookupFailed(); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java index 2daee7983dd..e2dee154816 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java @@ -19,12 +19,12 @@ import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER; +import static org.apache.phoenix.job.JobManager.JobCallable; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -32,7 +32,10 @@ .ExplainPlanAttributesBuilder; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.job.JobManager; import org.apache.phoenix.monitoring.OverAllQueryMetrics; +import org.apache.phoenix.monitoring.ReadMetricQueue; +import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ClientUtil; @@ -167,7 +170,7 @@ public void explain(List planSteps, } @VisibleForTesting - int getNumberOfParallelFetches() { + public int getNumberOfParallelFetches() { return numParallelFetches; } @@ -230,6 +233,8 @@ private List fetchNextBatch() throws SQLException { Collections.shuffle(openIterators); boolean success = false; SQLException toThrow = null; + long maxTaskQueueWaitTime = 0; + long maxTaskEndToEndTime = 0; try { StatementContext context = plan.getContext(); final ConnectionQueryServices services = context.getConnection().getQueryServices(); @@ -238,19 +243,40 @@ private List fetchNextBatch() throws SQLException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Performing parallel fetch for " + openIterators.size() + " iterators. "); } + String physicalTableName = plan.getTableRef().getTable().getPhysicalName().getString(); for (final RoundRobinIterator itr : openIterators) { - Future future = executor.submit(new Callable() { + ReadMetricQueue readMetricQueue = context.getReadMetricsQueue(); + TaskExecutionMetricsHolder taskMetrics = + new TaskExecutionMetricsHolder(readMetricQueue, physicalTableName); + Future future = executor.submit(new JobCallable() { @Override public Tuple call() throws Exception { // Read the next record to refill the scanner's cache. return itr.next(); } + + @Override + public Object getJobId() { + // Prior to using JobCallable, every callable refilling the scanner cache + // was treated as a separate producer in JobManager queue so, keeping + // that same. Should this be changed to ResultIterators.this ? + return this; + } + + @Override + public TaskExecutionMetricsHolder getTaskExecutionMetric() { + return taskMetrics; + } }); futures.add(future); } int i = 0; for (Future future : futures) { Tuple tuple = future.get(); + long taskQueueWaitTime = JobManager.getTaskQueueWaitTime(future); + long taskEndToEndTime = JobManager.getTaskEndToEndTime(future); + maxTaskQueueWaitTime = Math.max(maxTaskQueueWaitTime, taskQueueWaitTime); + maxTaskEndToEndTime = Math.max(maxTaskEndToEndTime, taskEndToEndTime); if (tuple != null) { results.add(new RoundRobinIterator(openIterators.get(i).delegate, tuple)); } else { @@ -279,9 +305,12 @@ public Tuple call() throws Exception { } } } finally { + OverAllQueryMetrics overAllQueryMetrics = + plan.getContext().getOverallQueryMetrics(); + overAllQueryMetrics.updateQueryWaitTime(maxTaskQueueWaitTime); + overAllQueryMetrics.updateQueryTaskEndToEndTime(maxTaskEndToEndTime); if (toThrow != null) { GLOBAL_FAILED_QUERY_COUNTER.increment(); - OverAllQueryMetrics overAllQueryMetrics = plan.getContext().getOverallQueryMetrics(); overAllQueryMetrics.queryFailed(); if (plan.getContext().getScanRanges().isPointLookup()) { overAllQueryMetrics.queryPointLookupFailed(); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java b/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java index 8801f0f66b6..7430dce81e2 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; @@ -38,6 +39,7 @@ import javax.annotation.Nullable; +import org.apache.phoenix.iterate.PeekingResultIterator; import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -153,6 +155,10 @@ public JobFutureTask(Callable c) { public Object getJobId() { return jobId; } + + public TaskExecutionMetricsHolder getTaskMetric() { + return taskMetric; + } } /** @@ -300,5 +306,31 @@ private static TaskExecutionMetricsHolder getRequestMetric(Runnable task) { return ((JobFutureTask)task).taskMetric; } } + + private static TaskExecutionMetricsHolder getTaskMetric( + Future futureTask) { + if (futureTask instanceof InstrumentedJobFutureTask) { + TaskExecutionMetricsHolder taskMetrics = + ((InstrumentedJobFutureTask) futureTask).getTaskMetric(); + return taskMetrics; + } + return null; + } + + public static long getTaskQueueWaitTime(Future futureTask) { + TaskExecutionMetricsHolder taskMetrics = getTaskMetric(futureTask); + if (taskMetrics != null) { + return taskMetrics.getTaskQueueWaitTime().getValue(); + } + return 0; + } + + public static long getTaskEndToEndTime(Future futureTask) { + TaskExecutionMetricsHolder taskMetrics = getTaskMetric(futureTask); + if (taskMetrics != null) { + return taskMetrics.getTaskEndToEndTime().getValue(); + } + return 0; + } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java index 40b7932a28c..d0494385a57 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java @@ -141,6 +141,11 @@ public enum MetricType { CACHE_REFRESH_SPLITS_COUNTER("cr", "Number of times cache was refreshed because of splits",LogLevel.DEBUG, PLong.INSTANCE), WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution",LogLevel.INFO, PLong.INSTANCE), RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()",LogLevel.INFO, PLong.INSTANCE), + WALL_CLOCK_QUERY_WAIT_TIME("qwt", "Total wall clock time spent by a " + + "query waiting in the Phoenix client thread pool queue", LogLevel.OFF, PLong.INSTANCE), + WALL_CLOCK_QUERY_TASK_END_TO_END_TIME("qeet", "Total wall clock time " + + "spent by a query in task execution in the Phoenix client thread pool" + + "including the queue wait time", LogLevel.OFF, PLong.INSTANCE), OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections",LogLevel.OFF, PLong.INSTANCE), OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER("io", "Number of open internal phoenix connections",LogLevel.OFF, PLong.INSTANCE), QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated",LogLevel.OFF, PLong.INSTANCE), diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java index 5038cb32387..d11af8053bb 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java @@ -26,6 +26,8 @@ import static org.apache.phoenix.monitoring.MetricType.QUERY_SCAN_TIMEOUT_COUNTER; import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER; import static org.apache.phoenix.monitoring.MetricType.RESULT_SET_TIME_MS; +import static org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_QUERY_TASK_END_TO_END_TIME; +import static org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_QUERY_WAIT_TIME; import static org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_TIME_MS; import java.util.HashMap; @@ -50,6 +52,8 @@ public class OverAllQueryMetrics { private final CombinableMetric queryPointLookupFailed; private final CombinableMetric queryScanFailed; private final CombinableMetric cacheRefreshedDueToSplits; + private final CombinableMetric queryWaitTime; + private final CombinableMetric queryTaskEndToEndTime; public OverAllQueryMetrics(boolean isRequestMetricsEnabled, LogLevel connectionLogLevel) { queryWatch = MetricUtil.getMetricsStopWatch(isRequestMetricsEnabled, connectionLogLevel, @@ -72,6 +76,11 @@ public OverAllQueryMetrics(boolean isRequestMetricsEnabled, LogLevel connectionL queryScanFailed = MetricUtil.getCombinableMetric(isRequestMetricsEnabled,connectionLogLevel, QUERY_SCAN_FAILED_COUNTER); cacheRefreshedDueToSplits = MetricUtil.getCombinableMetric(isRequestMetricsEnabled, connectionLogLevel, CACHE_REFRESH_SPLITS_COUNTER); + queryWaitTime = MetricUtil.getCombinableMetric(isRequestMetricsEnabled, + connectionLogLevel, WALL_CLOCK_QUERY_WAIT_TIME); + queryTaskEndToEndTime = MetricUtil.getCombinableMetric(isRequestMetricsEnabled, + connectionLogLevel, WALL_CLOCK_QUERY_TASK_END_TO_END_TIME); + } public void updateNumParallelScans(long numParallelScans) { @@ -132,6 +141,14 @@ public void stopResultSetWatch() { } } + public void updateQueryWaitTime(long queryWaitTime) { + this.queryWaitTime.change(queryWaitTime); + } + + public void updateQueryTaskEndToEndTime(long queryTaskEndToEndTime) { + this.queryTaskEndToEndTime.change(queryTaskEndToEndTime); + } + @VisibleForTesting long getWallClockTimeMs() { return wallClockTimeMS.getValue(); @@ -154,6 +171,9 @@ public Map publish() { metricsForPublish.put(queryPointLookupFailed.getMetricType(), queryPointLookupFailed.getValue()); metricsForPublish.put(queryScanFailed.getMetricType(), queryScanFailed.getValue()); metricsForPublish.put(cacheRefreshedDueToSplits.getMetricType(), cacheRefreshedDueToSplits.getValue()); + metricsForPublish.put(queryWaitTime.getMetricType(), queryWaitTime.getValue()); + metricsForPublish.put(queryTaskEndToEndTime.getMetricType(), + queryTaskEndToEndTime.getValue()); return metricsForPublish; } @@ -170,6 +190,8 @@ public void reset() { cacheRefreshedDueToSplits.reset(); queryWatch.stop(); resultSetWatch.stop(); + queryWaitTime.reset(); + queryTaskEndToEndTime.reset(); } public OverAllQueryMetrics combine(OverAllQueryMetrics metric) { @@ -183,6 +205,8 @@ public OverAllQueryMetrics combine(OverAllQueryMetrics metric) { numParallelScans.combine(metric.numParallelScans); wallClockTimeMS.combine(metric.wallClockTimeMS); resultSetTimeMS.combine(metric.resultSetTimeMS); + queryWaitTime.combine(metric.queryWaitTime); + queryTaskEndToEndTime.combine(metric.queryTaskEndToEndTime); return this; } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java index f3495fef914..fa8dad2ad7d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java @@ -75,6 +75,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.metrics2.AbstractMetric; @@ -84,6 +85,7 @@ import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.iterate.RoundRobinResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDriver; import org.apache.phoenix.jdbc.PhoenixResultSet; @@ -486,7 +488,9 @@ public void testReadMetricsForSelect() throws Exception { Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(query); PhoenixResultSet resultSetBeingTested = rs.unwrap(PhoenixResultSet.class); - changeInternalStateForTesting(resultSetBeingTested); + // TODO use a spy ? + ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(LogLevel.OFF,true); + changeInternalStateForTesting(resultSetBeingTested, testMetricsQueue); while (resultSetBeingTested.next()) {} resultSetBeingTested.close(); Set expectedTableNames = Sets.newHashSet(tableName); @@ -955,12 +959,11 @@ private void assertMetricsHaveSameValues(Map metricNameValueMa } } - private void changeInternalStateForTesting(PhoenixResultSet rs) throws NoSuchFieldException, - SecurityException, IllegalArgumentException, IllegalAccessException { + private void changeInternalStateForTesting(PhoenixResultSet rs, + ReadMetricQueue testMetricsQueue) throws + NoSuchFieldException, SecurityException, IllegalArgumentException, + IllegalAccessException { // get and set the internal state for testing purposes. - // TODO use a spy ? - ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(LogLevel.OFF,true); - Field rsQueueField = PhoenixResultSet.class.getDeclaredField("readMetricsQueue"); rsQueueField.setAccessible(true); rsQueueField.set(rs, testMetricsQueue); @@ -1298,5 +1301,227 @@ public Connection call() throws Exception { } } + @Test + public void testPhoenixClientQueueWaitTimeAndEndToEndTime() throws SQLException, + NoSuchFieldException, + IllegalAccessException { + int saltBucketNum = 8; + String tableName = generateUniqueName(); + String getRows = "SELECT COL1, COL2, PK4, PK2, PK3 FROM " + tableName + + " WHERE PK1=? AND PK2=? AND PK3=? AND PK4 IN (?"; + // Send at least 4 tasks in Phoenix client thread pool and we will assert also on this + // later while actually querying data. + int taskCount = 4; + for (int i = 1; i < taskCount; i++) { + getRows += ", ?"; + } + getRows += ")"; + final String upsertRows = "UPSERT INTO " + tableName + " VALUES(?, ?, ?, ?, ?, ?)"; + String creatTableDdl = "CREATE TABLE IF NOT EXISTS " + tableName + " (\n" + + " PK1 CHAR(15) NOT NULL,\n" + + " PK2 CHAR(15) NOT NULL,\n" + + " PK3 DECIMAL NOT NULL,\n" + + " PK4 CHAR(32) NOT NULL,\n" + + " COL1 VARCHAR NOT NULL,\n" + + " COL2 VARCHAR NOT NULL,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " PK1,\n" + + " PK2,\n" + + " PK3,\n" + + " PK4\n" + + " )\n" + + ") MULTI_TENANT=true, IMMUTABLE_ROWS=TRUE, SALT_BUCKETS=" + saltBucketNum; + long vpk3 = 1000; + try(Connection conn = DriverManager.getConnection(getUrl())) { + PreparedStatement stmt = conn.prepareStatement(upsertRows); + stmt.execute(creatTableDdl); + // Best effort to have 1 row per salt bucket + for (int i = 1; i <= saltBucketNum; i++) { + stmt.setString(1, "VPK1"); + stmt.setString(2, "VPK2"); + stmt.setLong(3, vpk3); + stmt.setString(4, "VPK4_" + i); + stmt.setString(5, "{i:" + i + "}"); + stmt.setString(6, "{o:" + i + "}"); + stmt.executeUpdate(); + } + conn.commit(); + } + try(Connection conn = DriverManager.getConnection(getUrl())) { + PreparedStatement stmt = conn.prepareStatement(getRows); + stmt.setString(1, "VPK1"); + stmt.setString(2, "VPK2"); + stmt.setLong(3, vpk3); + for (int i = 4; i < taskCount + 4; i++) { + stmt.setString(i, "VPK4_" + i); + } + ResultSet rs = stmt.executeQuery(); + PhoenixResultSet phoenixResultSet = rs.unwrap(PhoenixResultSet.class); + ReadMetricQueue readMetricQueue = new TestTaskReadMetricsQueue(LogLevel.OFF, + true); + changeInternalStateForTesting(phoenixResultSet, readMetricQueue); + while(rs.next()) {} + Map overallQueryMetrics = + PhoenixRuntime.getOverAllReadRequestMetricInfo(rs); + Map readMetrics = + PhoenixRuntime.getRequestReadMetricInfo(rs).get(tableName); + // Sent 4 tasks in Phoenix client thread pool. So, that we do see the max task + // queue time and max task end to end time. + assertEquals(4, (long) readMetrics.get(TASK_EXECUTED_COUNTER)); + assertEquals(20, (long) overallQueryMetrics.get( + MetricType.WALL_CLOCK_QUERY_WAIT_TIME)); + assertEquals(41, (long) overallQueryMetrics.get( + MetricType.WALL_CLOCK_QUERY_TASK_END_TO_END_TIME)); + } + } + + /** + * This test aims to test that when scanner cache of first batch of scans is exhausted + * in RoundRobinResultItr and next batch of scans is submitted then overall query wait + * time and query end to end task time is updated correctly. + * + * @throws SQLException + * @throws NoSuchFieldException + * @throws IllegalAccessException + */ + @Test + public void testPhoenixClientQueueWaitTimeAndEndToEndTimeWithScannerCacheRefill() + throws Exception { + int fetchSize = 2; + int saltBucketNum = 8; + String tableName = generateUniqueName(); + String getRows = "SELECT COL1, COL2, PK4, PK2, PK3 FROM " + tableName + + " WHERE PK1=? AND PK2=? AND PK3=? AND PK4 IN (?"; + // Send at least 2 batches of scans. The salt buckets are 8 so, even if first batch of + // scans cover all the salt buckets still only 16 rows will be cached with fetch size of 2. + // So, setting rows to read per query greater than 16. + int taskCount = saltBucketNum * fetchSize + saltBucketNum; + for (int i = 1; i < taskCount; i++) { + getRows += ", ?"; + } + getRows += ")"; + final String upsertRows = "UPSERT INTO " + tableName + " VALUES(?, ?, ?, ?, ?, ?)"; + String creatTableDdl = "CREATE TABLE IF NOT EXISTS " + tableName + " (\n" + + " PK1 CHAR(15) NOT NULL,\n" + + " PK2 CHAR(15) NOT NULL,\n" + + " PK3 DECIMAL NOT NULL,\n" + + " PK4 CHAR(32) NOT NULL,\n" + + " COL1 VARCHAR NOT NULL,\n" + + " COL2 VARCHAR NOT NULL,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " PK1,\n" + + " PK2,\n" + + " PK3,\n" + + " PK4\n" + + " )\n" + + ") MULTI_TENANT=true, IMMUTABLE_ROWS=TRUE, SALT_BUCKETS=" + saltBucketNum; + long vpk3 = 1000; + try(Connection conn = DriverManager.getConnection(getUrl())) { + PreparedStatement stmt = conn.prepareStatement(upsertRows); + stmt.execute(creatTableDdl); + // Best effort to have 1 row per salt bucket + for (int i = 1; i <= saltBucketNum; i++) { + stmt.setString(1, "VPK1"); + stmt.setString(2, "VPK2"); + stmt.setLong(3, vpk3); + stmt.setString(4, "VPK4_" + i); + stmt.setString(5, "{i:" + i + "}"); + stmt.setString(6, "{o:" + i + "}"); + stmt.executeUpdate(); + } + conn.commit(); + } + String url = getUrl("TEST"); + Properties props = new Properties(); + props.setProperty(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, String.valueOf(false)); + try(Connection conn = DriverManager.getConnection(url, props)) { + PreparedStatement stmt = conn.prepareStatement(getRows); + stmt.setFetchSize(fetchSize); + stmt.setString(1, "VPK1"); + stmt.setString(2, "VPK2"); + stmt.setLong(3, vpk3); + for (int i = 4; i < taskCount + 4; i++) { + stmt.setString(i, "VPK4_" + i); + } + ResultSet rs = stmt.executeQuery(); + PhoenixResultSet phoenixResultSet = rs.unwrap(PhoenixResultSet.class); + // Make sure we are using RoundRobinResultItr. as it refills scanner cache + assertTrue(phoenixResultSet.getUnderlyingIterator() + instanceof RoundRobinResultIterator); + RoundRobinResultIterator itr = + (RoundRobinResultIterator) phoenixResultSet.getUnderlyingIterator(); + ReadMetricQueue readMetricQueue = new TestTaskReadMetricsQueue(LogLevel.OFF, + true); + changeInternalStateForTesting(phoenixResultSet, readMetricQueue); + while(rs.next()) {} + Map overallQueryMetrics = + PhoenixRuntime.getOverAllReadRequestMetricInfo(rs); + Map readMetrics = + PhoenixRuntime.getRequestReadMetricInfo(rs).get(tableName); + // Make sure scanner cache was refilled once + assertEquals(1, itr.getNumberOfParallelFetches()); + assertEquals(40, + (long) overallQueryMetrics.get(MetricType.WALL_CLOCK_QUERY_WAIT_TIME)); + assertEquals(82, (long) overallQueryMetrics.get( + MetricType.WALL_CLOCK_QUERY_TASK_END_TO_END_TIME)); + long totalTasksExecuted = readMetrics.get(TASK_EXECUTED_COUNTER); + assertEquals(totalTasksExecuted * TASK_EXECUTION_TIME_DELTA, + (long) readMetrics.get(TASK_EXECUTION_TIME)); + } + } + + private class TestTaskReadMetricsQueue extends ReadMetricQueue { + + int taskQueueWaitTimeIndex = 0; + int taskEndToEndTimeIndex = 0; + int[] taskQueueWaitTime = {10, 5, 20, 7}; + int[] taskEndToEndTime = {20, 15, 41, 16}; + // To make test predictable + final Object lock = new Object(); + + public TestTaskReadMetricsQueue(LogLevel connectionLogLevel, + boolean isRequestMetricsEnabled) { + super(isRequestMetricsEnabled, connectionLogLevel); + } + + @Override + public CombinableMetric getMetric(MetricType type) { + switch (type) { + case TASK_QUEUE_WAIT_TIME: + return new CombinableMetricImpl(type) { + + @Override + public void change(long delta) { + synchronized (lock) { + super.change(taskQueueWaitTime[taskQueueWaitTimeIndex]); + taskQueueWaitTimeIndex++; + taskQueueWaitTimeIndex %= taskQueueWaitTime.length; + } + } + }; + case TASK_END_TO_END_TIME: + return new CombinableMetricImpl(type) { + + @Override + public void change(long delta) { + synchronized (lock) { + super.change(taskEndToEndTime[taskEndToEndTimeIndex]); + taskEndToEndTimeIndex++; + taskEndToEndTimeIndex %= taskEndToEndTime.length; + } + } + }; + case TASK_EXECUTION_TIME: + return new CombinableMetricImpl(type) { + + @Override + public void change(long delta) { + super.change(TASK_EXECUTION_TIME_DELTA); + } + }; + } + return super.getMetric(type); + } + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 1405ad6355c..7d5ebbb849d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -441,6 +441,13 @@ protected static String getUrl() { } return url; } + + protected static String getUrl(String principal) throws Exception { + if (!clusterInitialized) { + throw new IllegalStateException("Cluster must be initialized before attempting to get the URL"); + } + return getLocalClusterUrl(utility, principal); + } protected static String checkClusterInitialized(ReadOnlyProps serverProps) throws Exception { if (!clusterInitialized) { @@ -569,6 +576,13 @@ protected static String getLocalClusterUrl(HBaseTestingUtility util) throws Exce String url = QueryUtil.getConnectionUrl(new Properties(), util.getConfiguration()); return url + PHOENIX_TEST_DRIVER_URL_PARAM; } + + protected static String getLocalClusterUrl(HBaseTestingUtility util, String principal) + throws Exception { + String url = QueryUtil.getConnectionUrl(new Properties(), util.getConfiguration(), + principal); + return url + PHOENIX_TEST_DRIVER_URL_PARAM; + } /** * Initialize the cluster in distributed mode From bf88716649ec26b40d337ca56625080c89dbdc14 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Mon, 17 Feb 2025 12:13:57 +0530 Subject: [PATCH 2/7] Minor nits --- .../src/main/java/org/apache/phoenix/job/JobManager.java | 1 - .../org/apache/phoenix/monitoring/PhoenixMetricsIT.java | 8 +++----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java b/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java index 7430dce81e2..02f10acd718 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java @@ -39,7 +39,6 @@ import javax.annotation.Nullable; -import org.apache.phoenix.iterate.PeekingResultIterator; import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java index fa8dad2ad7d..798fc9a8dc5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java @@ -1335,7 +1335,6 @@ public void testPhoenixClientQueueWaitTimeAndEndToEndTime() throws SQLException, try(Connection conn = DriverManager.getConnection(getUrl())) { PreparedStatement stmt = conn.prepareStatement(upsertRows); stmt.execute(creatTableDdl); - // Best effort to have 1 row per salt bucket for (int i = 1; i <= saltBucketNum; i++) { stmt.setString(1, "VPK1"); stmt.setString(2, "VPK2"); @@ -1387,7 +1386,7 @@ public void testPhoenixClientQueueWaitTimeAndEndToEndTime() throws SQLException, @Test public void testPhoenixClientQueueWaitTimeAndEndToEndTimeWithScannerCacheRefill() throws Exception { - int fetchSize = 2; + int fetchSize = 3; int saltBucketNum = 8; String tableName = generateUniqueName(); String getRows = "SELECT COL1, COL2, PK4, PK2, PK3 FROM " + tableName @@ -1395,7 +1394,7 @@ public void testPhoenixClientQueueWaitTimeAndEndToEndTimeWithScannerCacheRefill( // Send at least 2 batches of scans. The salt buckets are 8 so, even if first batch of // scans cover all the salt buckets still only 16 rows will be cached with fetch size of 2. // So, setting rows to read per query greater than 16. - int taskCount = saltBucketNum * fetchSize + saltBucketNum; + int taskCount = saltBucketNum * (fetchSize - 1) + saltBucketNum; for (int i = 1; i < taskCount; i++) { getRows += ", ?"; } @@ -1419,8 +1418,7 @@ public void testPhoenixClientQueueWaitTimeAndEndToEndTimeWithScannerCacheRefill( try(Connection conn = DriverManager.getConnection(getUrl())) { PreparedStatement stmt = conn.prepareStatement(upsertRows); stmt.execute(creatTableDdl); - // Best effort to have 1 row per salt bucket - for (int i = 1; i <= saltBucketNum; i++) { + for (int i = 1; i <= 2 * taskCount; i++) { stmt.setString(1, "VPK1"); stmt.setString(2, "VPK2"); stmt.setLong(3, vpk3); From b480e25377acbf8ef2deea51ff68437f6f4e49e3 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Wed, 26 Feb 2025 01:12:56 +0530 Subject: [PATCH 3/7] Address Hari's comments --- .../phoenix/iterate/BaseResultIterators.java | 8 ++++++-- .../iterate/RoundRobinResultIterator.java | 5 +++-- .../org/apache/phoenix/job/JobManager.java | 20 ++----------------- 3 files changed, 11 insertions(+), 22 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 66baf4a7e8c..ee005fddce0 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -88,6 +88,7 @@ import org.apache.phoenix.job.JobManager; import org.apache.phoenix.join.HashCacheClient; import org.apache.phoenix.monitoring.OverAllQueryMetrics; +import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.parse.HintNode.Hint; @@ -1472,8 +1473,11 @@ private List getIterators(List> scan, Connecti Future futureTask = scanPair.getSecond(); PeekingResultIterator iterator = futureTask.get(timeOutForScan, TimeUnit.MILLISECONDS); - long taskQueueWaitTime = JobManager.getTaskQueueWaitTime(futureTask); - long taskEndToEndTime = JobManager.getTaskEndToEndTime(futureTask); + TaskExecutionMetricsHolder taskMetricsHolder = + JobManager.getTaskMetrics(futureTask); + long taskQueueWaitTime = + taskMetricsHolder.getTaskQueueWaitTime().getValue(); + long taskEndToEndTime = taskMetricsHolder.getTaskEndToEndTime().getValue(); maxTaskQueueWaitTime = Math.max(maxTaskQueueWaitTime, taskQueueWaitTime); maxTaskEndToEndTime = Math.max(maxTaskEndToEndTime, taskEndToEndTime); concatIterators.add(iterator); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java index e2dee154816..2ddf3063260 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java @@ -273,8 +273,9 @@ public TaskExecutionMetricsHolder getTaskExecutionMetric() { int i = 0; for (Future future : futures) { Tuple tuple = future.get(); - long taskQueueWaitTime = JobManager.getTaskQueueWaitTime(future); - long taskEndToEndTime = JobManager.getTaskEndToEndTime(future); + TaskExecutionMetricsHolder taskMetricsHolder = JobManager.getTaskMetrics(future); + long taskQueueWaitTime = taskMetricsHolder.getTaskQueueWaitTime().getValue(); + long taskEndToEndTime = taskMetricsHolder.getTaskEndToEndTime().getValue(); maxTaskQueueWaitTime = Math.max(maxTaskQueueWaitTime, taskQueueWaitTime); maxTaskEndToEndTime = Math.max(maxTaskEndToEndTime, taskEndToEndTime); if (tuple != null) { diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java b/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java index 02f10acd718..b3b4fabb68c 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java @@ -306,30 +306,14 @@ private static TaskExecutionMetricsHolder getRequestMetric(Runnable task) { } } - private static TaskExecutionMetricsHolder getTaskMetric( + public static TaskExecutionMetricsHolder getTaskMetrics( Future futureTask) { if (futureTask instanceof InstrumentedJobFutureTask) { TaskExecutionMetricsHolder taskMetrics = ((InstrumentedJobFutureTask) futureTask).getTaskMetric(); return taskMetrics; } - return null; - } - - public static long getTaskQueueWaitTime(Future futureTask) { - TaskExecutionMetricsHolder taskMetrics = getTaskMetric(futureTask); - if (taskMetrics != null) { - return taskMetrics.getTaskQueueWaitTime().getValue(); - } - return 0; - } - - public static long getTaskEndToEndTime(Future futureTask) { - TaskExecutionMetricsHolder taskMetrics = getTaskMetric(futureTask); - if (taskMetrics != null) { - return taskMetrics.getTaskEndToEndTime().getValue(); - } - return 0; + return TaskExecutionMetricsHolder.NO_OP_INSTANCE; } } From f44842178d3878c40bc6193b5143371e45c6a60b Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Wed, 26 Feb 2025 11:21:57 +0530 Subject: [PATCH 4/7] Address Hari's comments --- .../main/java/org/apache/phoenix/job/JobManager.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java b/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java index b3b4fabb68c..69d8c63b814 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/job/JobManager.java @@ -126,7 +126,9 @@ protected RunnableFuture newTaskFor(Runnable runnable, T value) { static class JobFutureTask extends FutureTask { private final Object jobId; @Nullable - private final TaskExecutionMetricsHolder taskMetric; + // TODO: Shift this instance variable to InstrumentedJobFutureTask as task metric + // instrumentation happens only for InstrumentedJobFutureTask. + protected final TaskExecutionMetricsHolder taskMetric; public JobFutureTask(Runnable r, T t) { super(r, t); @@ -154,10 +156,6 @@ public JobFutureTask(Callable c) { public Object getJobId() { return jobId; } - - public TaskExecutionMetricsHolder getTaskMetric() { - return taskMetric; - } } /** @@ -198,6 +196,9 @@ public long getTaskExecutionStartTime() { return taskExecutionStartTime; } + public TaskExecutionMetricsHolder getTaskMetric() { + return taskMetric; + } } /** From b79dae81ecc50ad7d5d683cd5a8d4877b157bf0f Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Wed, 26 Feb 2025 17:19:55 +0530 Subject: [PATCH 5/7] Address Hari's comments --- .../java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java index 798fc9a8dc5..1dda2c24db4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java @@ -1317,6 +1317,8 @@ public void testPhoenixClientQueueWaitTimeAndEndToEndTime() throws SQLException, } getRows += ")"; final String upsertRows = "UPSERT INTO " + tableName + " VALUES(?, ?, ?, ?, ?, ?)"; + // Here simpler schema can also be used as long we ensure that we create required number + // scans via salting String creatTableDdl = "CREATE TABLE IF NOT EXISTS " + tableName + " (\n" + " PK1 CHAR(15) NOT NULL,\n" + " PK2 CHAR(15) NOT NULL,\n" + @@ -1400,6 +1402,8 @@ public void testPhoenixClientQueueWaitTimeAndEndToEndTimeWithScannerCacheRefill( } getRows += ")"; final String upsertRows = "UPSERT INTO " + tableName + " VALUES(?, ?, ?, ?, ?, ?)"; + // Here simpler schema can also be used as long we ensure that we create required number + // scans via salting String creatTableDdl = "CREATE TABLE IF NOT EXISTS " + tableName + " (\n" + " PK1 CHAR(15) NOT NULL,\n" + " PK2 CHAR(15) NOT NULL,\n" + From bfc41663e4f726363671c756f3c08d94703439f4 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Mon, 24 Mar 2025 12:58:35 +0530 Subject: [PATCH 6/7] Address Tanuj's comments --- .../phoenix/monitoring/PhoenixMetricsIT.java | 95 +++++++++---------- 1 file changed, 43 insertions(+), 52 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java index 1dda2c24db4..58749c03cd6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java @@ -145,6 +145,8 @@ public class PhoenixMetricsIT extends BasePhoenixMetricsIT { + " %s WHERE A='keyA1' AND B='keyB1' AND C='keyC1' AND D='keyD1'"; static final String RANGE_SCAN_SELECT_QUERY = "SELECT A, B, C FROM" + " %s WHERE A='keyA1' AND B='keyB1' AND C > 'keyC0'"; + static final int[] TASK_QUEUE_WAIT_TIME_ARR = {10, 5, 20, 7}; + static final int[] TASK_E2E_TIME_ARR = {20, 15, 41, 16}; private static class MyClock extends EnvironmentEdge { private long time; @@ -960,9 +962,7 @@ private void assertMetricsHaveSameValues(Map metricNameValueMa } private void changeInternalStateForTesting(PhoenixResultSet rs, - ReadMetricQueue testMetricsQueue) throws - NoSuchFieldException, SecurityException, IllegalArgumentException, - IllegalAccessException { + ReadMetricQueue testMetricsQueue) throws Exception { // get and set the internal state for testing purposes. Field rsQueueField = PhoenixResultSet.class.getDeclaredField("readMetricsQueue"); rsQueueField.setAccessible(true); @@ -1301,22 +1301,7 @@ public Connection call() throws Exception { } } - @Test - public void testPhoenixClientQueueWaitTimeAndEndToEndTime() throws SQLException, - NoSuchFieldException, - IllegalAccessException { - int saltBucketNum = 8; - String tableName = generateUniqueName(); - String getRows = "SELECT COL1, COL2, PK4, PK2, PK3 FROM " + tableName - + " WHERE PK1=? AND PK2=? AND PK3=? AND PK4 IN (?"; - // Send at least 4 tasks in Phoenix client thread pool and we will assert also on this - // later while actually querying data. - int taskCount = 4; - for (int i = 1; i < taskCount; i++) { - getRows += ", ?"; - } - getRows += ")"; - final String upsertRows = "UPSERT INTO " + tableName + " VALUES(?, ?, ?, ?, ?, ?)"; + private void createSaltedTable(String tableName, int saltBucketNum) throws Exception { // Here simpler schema can also be used as long we ensure that we create required number // scans via salting String creatTableDdl = "CREATE TABLE IF NOT EXISTS " + tableName + " (\n" + @@ -1333,10 +1318,35 @@ public void testPhoenixClientQueueWaitTimeAndEndToEndTime() throws SQLException, " PK4\n" + " )\n" + ") MULTI_TENANT=true, IMMUTABLE_ROWS=TRUE, SALT_BUCKETS=" + saltBucketNum; + try(Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement()) { + stmt.execute(creatTableDdl); + } + } + + private String createQueryWithControlledTasksInPhoenixClientThreadPool( + String tableName, int taskCount) throws Exception { + String getRows = "SELECT COL1, COL2, PK4, PK2, PK3 FROM " + tableName + + " WHERE PK1=? AND PK2=? AND PK3=? AND PK4 IN (?"; + for (int i = 1; i < taskCount; i++) { + getRows += ", ?"; + } + getRows += ")"; + return getRows; + } + + @Test + public void testPhoenixClientQueueWaitTimeAndEndToEndTime() throws Exception { + int saltBucketNum = 8; + String tableName = generateUniqueName(); + // Send at least 4 tasks in Phoenix client thread pool and we will assert also on this + // later while actually querying data. + int taskCount = 4; + createSaltedTable(tableName, saltBucketNum); + final String upsertRows = "UPSERT INTO " + tableName + " VALUES(?, ?, ?, ?, ?, ?)"; long vpk3 = 1000; try(Connection conn = DriverManager.getConnection(getUrl())) { PreparedStatement stmt = conn.prepareStatement(upsertRows); - stmt.execute(creatTableDdl); for (int i = 1; i <= saltBucketNum; i++) { stmt.setString(1, "VPK1"); stmt.setString(2, "VPK2"); @@ -1348,6 +1358,8 @@ public void testPhoenixClientQueueWaitTimeAndEndToEndTime() throws SQLException, } conn.commit(); } + String getRows = createQueryWithControlledTasksInPhoenixClientThreadPool(tableName, + taskCount); try(Connection conn = DriverManager.getConnection(getUrl())) { PreparedStatement stmt = conn.prepareStatement(getRows); stmt.setString(1, "VPK1"); @@ -1359,7 +1371,7 @@ public void testPhoenixClientQueueWaitTimeAndEndToEndTime() throws SQLException, ResultSet rs = stmt.executeQuery(); PhoenixResultSet phoenixResultSet = rs.unwrap(PhoenixResultSet.class); ReadMetricQueue readMetricQueue = new TestTaskReadMetricsQueue(LogLevel.OFF, - true); + true, TASK_QUEUE_WAIT_TIME_ARR, TASK_E2E_TIME_ARR); changeInternalStateForTesting(phoenixResultSet, readMetricQueue); while(rs.next()) {} Map overallQueryMetrics = @@ -1380,10 +1392,6 @@ public void testPhoenixClientQueueWaitTimeAndEndToEndTime() throws SQLException, * This test aims to test that when scanner cache of first batch of scans is exhausted * in RoundRobinResultItr and next batch of scans is submitted then overall query wait * time and query end to end task time is updated correctly. - * - * @throws SQLException - * @throws NoSuchFieldException - * @throws IllegalAccessException */ @Test public void testPhoenixClientQueueWaitTimeAndEndToEndTimeWithScannerCacheRefill() @@ -1391,37 +1399,15 @@ public void testPhoenixClientQueueWaitTimeAndEndToEndTimeWithScannerCacheRefill( int fetchSize = 3; int saltBucketNum = 8; String tableName = generateUniqueName(); - String getRows = "SELECT COL1, COL2, PK4, PK2, PK3 FROM " + tableName - + " WHERE PK1=? AND PK2=? AND PK3=? AND PK4 IN (?"; // Send at least 2 batches of scans. The salt buckets are 8 so, even if first batch of // scans cover all the salt buckets still only 16 rows will be cached with fetch size of 2. // So, setting rows to read per query greater than 16. int taskCount = saltBucketNum * (fetchSize - 1) + saltBucketNum; - for (int i = 1; i < taskCount; i++) { - getRows += ", ?"; - } - getRows += ")"; + createSaltedTable(tableName, saltBucketNum); final String upsertRows = "UPSERT INTO " + tableName + " VALUES(?, ?, ?, ?, ?, ?)"; - // Here simpler schema can also be used as long we ensure that we create required number - // scans via salting - String creatTableDdl = "CREATE TABLE IF NOT EXISTS " + tableName + " (\n" + - " PK1 CHAR(15) NOT NULL,\n" + - " PK2 CHAR(15) NOT NULL,\n" + - " PK3 DECIMAL NOT NULL,\n" + - " PK4 CHAR(32) NOT NULL,\n" + - " COL1 VARCHAR NOT NULL,\n" + - " COL2 VARCHAR NOT NULL,\n" + - " CONSTRAINT PK PRIMARY KEY (\n" + - " PK1,\n" + - " PK2,\n" + - " PK3,\n" + - " PK4\n" + - " )\n" + - ") MULTI_TENANT=true, IMMUTABLE_ROWS=TRUE, SALT_BUCKETS=" + saltBucketNum; long vpk3 = 1000; try(Connection conn = DriverManager.getConnection(getUrl())) { PreparedStatement stmt = conn.prepareStatement(upsertRows); - stmt.execute(creatTableDdl); for (int i = 1; i <= 2 * taskCount; i++) { stmt.setString(1, "VPK1"); stmt.setString(2, "VPK2"); @@ -1433,6 +1419,8 @@ public void testPhoenixClientQueueWaitTimeAndEndToEndTimeWithScannerCacheRefill( } conn.commit(); } + String getRows = createQueryWithControlledTasksInPhoenixClientThreadPool( + tableName, taskCount); String url = getUrl("TEST"); Properties props = new Properties(); props.setProperty(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, String.valueOf(false)); @@ -1453,7 +1441,7 @@ public void testPhoenixClientQueueWaitTimeAndEndToEndTimeWithScannerCacheRefill( RoundRobinResultIterator itr = (RoundRobinResultIterator) phoenixResultSet.getUnderlyingIterator(); ReadMetricQueue readMetricQueue = new TestTaskReadMetricsQueue(LogLevel.OFF, - true); + true, TASK_QUEUE_WAIT_TIME_ARR, TASK_E2E_TIME_ARR); changeInternalStateForTesting(phoenixResultSet, readMetricQueue); while(rs.next()) {} Map overallQueryMetrics = @@ -1476,14 +1464,17 @@ private class TestTaskReadMetricsQueue extends ReadMetricQueue { int taskQueueWaitTimeIndex = 0; int taskEndToEndTimeIndex = 0; - int[] taskQueueWaitTime = {10, 5, 20, 7}; - int[] taskEndToEndTime = {20, 15, 41, 16}; + int[] taskQueueWaitTime; + int[] taskEndToEndTime; // To make test predictable final Object lock = new Object(); public TestTaskReadMetricsQueue(LogLevel connectionLogLevel, - boolean isRequestMetricsEnabled) { + boolean isRequestMetricsEnabled, int[] taskQueueWaitTime, + int[] taskEndToEndTime) { super(isRequestMetricsEnabled, connectionLogLevel); + this.taskQueueWaitTime = taskQueueWaitTime; + this.taskEndToEndTime = taskEndToEndTime; } @Override From 2ccd95dc46b6a57db97922bc0ebea3c8cc5199c1 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Mon, 24 Mar 2025 17:22:46 +0530 Subject: [PATCH 7/7] Address checkstyle bugs --- .../org/apache/phoenix/monitoring/MetricType.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java index d0494385a57..759ab2ed769 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java @@ -141,11 +141,12 @@ public enum MetricType { CACHE_REFRESH_SPLITS_COUNTER("cr", "Number of times cache was refreshed because of splits",LogLevel.DEBUG, PLong.INSTANCE), WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution",LogLevel.INFO, PLong.INSTANCE), RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()",LogLevel.INFO, PLong.INSTANCE), - WALL_CLOCK_QUERY_WAIT_TIME("qwt", "Total wall clock time spent by a " + - "query waiting in the Phoenix client thread pool queue", LogLevel.OFF, PLong.INSTANCE), - WALL_CLOCK_QUERY_TASK_END_TO_END_TIME("qeet", "Total wall clock time " + - "spent by a query in task execution in the Phoenix client thread pool" + - "including the queue wait time", LogLevel.OFF, PLong.INSTANCE), + WALL_CLOCK_QUERY_WAIT_TIME("qwt", "Total wall clock time spent by a " + + "query waiting in the Phoenix client thread pool queue", + LogLevel.OFF, PLong.INSTANCE), + WALL_CLOCK_QUERY_TASK_END_TO_END_TIME("qeet", "Total wall clock time " + + "spent by a query in task execution in the Phoenix client thread pool" + + "including the queue wait time", LogLevel.OFF, PLong.INSTANCE), OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections",LogLevel.OFF, PLong.INSTANCE), OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER("io", "Number of open internal phoenix connections",LogLevel.OFF, PLong.INSTANCE), QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated",LogLevel.OFF, PLong.INSTANCE),