diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
index 2170550dde..808a445d8e 100644
--- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
+++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
@@ -1489,6 +1489,12 @@ private void revokeGrantRecord(
@Override
public @Nonnull EntitiesResult loadTasks(
@Nonnull PolarisCallContext callCtx, String executorId, int limit) {
+ return loadTasks(callCtx, executorId, limit, false);
+ }
+
+ @Override
+ public @Nonnull EntitiesResult loadTasks(
+ @Nonnull PolarisCallContext callCtx, String executorId, int limit, boolean txnPerTask) {
BasePersistence ms = callCtx.getMetaStore();
// find all available tasks
diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
index da2ab521e1..5a885c108d 100644
--- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
+++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
@@ -296,16 +296,36 @@ EntityResult loadEntity(
@Nonnull PolarisEntityType entityType);
/**
- * Fetch a list of tasks to be completed. Tasks
+ * Fetch a list of tasks to be completed.
+ *
+ *
This method uses the default task selection logic, which selects tasks that are either
+ * unassigned or have timed out. All matching tasks are processed within a single transaction.
*
* @param callCtx call context
* @param executorId executor id
- * @param limit limit
- * @return list of tasks to be completed
+ * @param limit max number of tasks to lease
+ * @return list of leased tasks
*/
@Nonnull
EntitiesResult loadTasks(@Nonnull PolarisCallContext callCtx, String executorId, int limit);
+ /**
+ * Fetch a list of tasks to be completed with customizable behavior.
+ *
+ *
Supports custom filtering and per-task transactional execution. When {@code txnPerTask} is
+ * true, each task is leased within its own transaction to avoid aborting the entire batch on a
+ * single failure.
+ *
+ * @param callCtx call context
+ * @param executorId executor id
+ * @param limit max number of tasks to lease
+ * @param txnPerTask whether to lease each task in its own transaction
+ * @return list of successfully leased tasks
+ */
+ @Nonnull
+ EntitiesResult loadTasks(
+ @Nonnull PolarisCallContext callCtx, String executorId, int limit, boolean txnPerTask);
+
/**
* Load change tracking information for a set of entities in one single shot and return for each
* the version for the entity itself and the version associated to its grant records.
diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
index 9b5a6b6dbe..bb4952bb01 100644
--- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
+++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
@@ -325,6 +325,13 @@ public EntitiesResult loadTasks(
return null;
}
+ @Override
+ public EntitiesResult loadTasks(
+ @Nonnull PolarisCallContext callCtx, String executorId, int limit, boolean txnPerTask) {
+ callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "loadTasks");
+ return null;
+ }
+
@Override
public ScopedCredentialsResult getSubscopedCredsForEntity(
@Nonnull PolarisCallContext callCtx,
diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
index 9f508eb4e1..572123428c 100644
--- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
+++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
@@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.polaris.core.PolarisCallContext;
@@ -1973,9 +1974,6 @@ private PolarisEntityResolver resolveSecurableToRoleGrant(
if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) {
loadedTasks.add(result.getEntity());
} else {
- // TODO: Consider performing incremental leasing of individual tasks one at a time
- // instead of requiring all-or-none semantics for all the tasks we think we listed,
- // or else contention could be very bad.
ms.rollback();
throw new RetryOnConcurrencyException(
"Failed to lease available task with status %s, info: %s",
@@ -1985,11 +1983,60 @@ private PolarisEntityResolver resolveSecurableToRoleGrant(
return new EntitiesResult(loadedTasks);
}
+ private @Nonnull EntitiesResult loadTasksWithIsolatedTxn(
+ @Nonnull PolarisCallContext callCtx,
+ @Nonnull TransactionalPersistence ms,
+ String executorId,
+ int limit) {
+ List entitySuccessResults = new ArrayList<>();
+ final AtomicInteger failedLeaseCount = new AtomicInteger(0);
+ for (int i = 0; i < limit; i++) {
+ try {
+ EntitiesResult result =
+ ms.runInTransaction(callCtx, () -> this.loadTasks(callCtx, ms, executorId, 1));
+ if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) {
+ entitySuccessResults.add(result);
+ } else {
+ failedLeaseCount.incrementAndGet();
+ LOGGER.warn(
+ "Fail to lease task, error status: {}, error info: {}",
+ result.getReturnStatus(),
+ result.getExtraInformation());
+ }
+ } catch (Exception e) {
+ failedLeaseCount.incrementAndGet();
+ LOGGER.warn("Exception while leasing task: {}", e.getMessage());
+ }
+ }
+
+ if (entitySuccessResults.isEmpty() && failedLeaseCount.get() > 0) {
+ throw new RetryOnConcurrencyException(
+ "Failed to lease any of %s tasks due to concurrent leases", failedLeaseCount.get());
+ }
+
+ List entities =
+ entitySuccessResults.stream()
+ .flatMap(result -> result.getEntities().stream())
+ .collect(Collectors.toList());
+
+ return new EntitiesResult(entities);
+ }
+
@Override
public @Nonnull EntitiesResult loadTasks(
@Nonnull PolarisCallContext callCtx, String executorId, int limit) {
+ return loadTasks(callCtx, executorId, limit, false);
+ }
+
+ @Override
+ public @Nonnull EntitiesResult loadTasks(
+ @Nonnull PolarisCallContext callCtx, String executorId, int limit, boolean perTaskTxn) {
TransactionalPersistence ms = ((TransactionalPersistence) callCtx.getMetaStore());
- return ms.runInTransaction(callCtx, () -> this.loadTasks(callCtx, ms, executorId, limit));
+ if (!perTaskTxn) {
+ return ms.runInTransaction(callCtx, () -> this.loadTasks(callCtx, ms, executorId, limit));
+ } else {
+ return loadTasksWithIsolatedTxn(callCtx, ms, executorId, limit);
+ }
}
/** {@inheritDoc} */
diff --git a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
index f583137120..8147d679e2 100644
--- a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
+++ b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
@@ -47,6 +47,8 @@
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.threeten.extra.MutableClock;
/**
@@ -297,8 +299,9 @@ protected void testPolicyMappingCleanup() {
polarisTestMetaStoreManager.testPolicyMappingCleanup();
}
- @Test
- protected void testLoadTasks() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ protected void testLoadTasks(boolean txnPerTask) {
for (int i = 0; i < 20; i++) {
polarisTestMetaStoreManager.createEntity(
null, PolarisEntityType.TASK, PolarisEntitySubType.NULL_SUBTYPE, "task_" + i);
@@ -307,7 +310,7 @@ protected void testLoadTasks() {
PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager;
PolarisCallContext callCtx = polarisTestMetaStoreManager.polarisCallContext;
List taskList =
- metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities();
+ metaStoreManager.loadTasks(callCtx, executorId, 5, txnPerTask).getEntities();
Assertions.assertThat(taskList)
.isNotNull()
.isNotEmpty()
@@ -327,7 +330,7 @@ protected void testLoadTasks() {
// grab a second round of tasks. Assert that none of the original 5 are in the list
List newTaskList =
- metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities();
+ metaStoreManager.loadTasks(callCtx, executorId, 5, txnPerTask).getEntities();
Assertions.assertThat(newTaskList)
.isNotNull()
.isNotEmpty()
@@ -341,7 +344,7 @@ protected void testLoadTasks() {
// only 10 tasks are unassigned. Requesting 20, we should only receive those 10
List lastTen =
- metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities();
+ metaStoreManager.loadTasks(callCtx, executorId, 20, txnPerTask).getEntities();
Assertions.assertThat(lastTen)
.isNotNull()
@@ -355,7 +358,7 @@ protected void testLoadTasks() {
.collect(Collectors.toSet());
List emtpyList =
- metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities();
+ metaStoreManager.loadTasks(callCtx, executorId, 20, txnPerTask).getEntities();
Assertions.assertThat(emtpyList).isNotNull().isEmpty();
@@ -363,7 +366,7 @@ protected void testLoadTasks() {
// all the tasks are unassigned. Fetch them all
List allTasks =
- metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities();
+ metaStoreManager.loadTasks(callCtx, executorId, 20, txnPerTask).getEntities();
Assertions.assertThat(allTasks)
.isNotNull()
@@ -378,13 +381,14 @@ protected void testLoadTasks() {
timeSource.add(Duration.ofMinutes(10));
List finalList =
- metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities();
+ metaStoreManager.loadTasks(callCtx, executorId, 20, txnPerTask).getEntities();
Assertions.assertThat(finalList).isNotNull().isEmpty();
}
- @Test
- protected void testLoadTasksInParallel() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ protected void testLoadTasksInParallel(boolean txnPerTask) throws Exception {
for (int i = 0; i < 100; i++) {
polarisTestMetaStoreManager.createEntity(
null, PolarisEntityType.TASK, PolarisEntitySubType.NULL_SUBTYPE, "task_" + i);
@@ -406,7 +410,10 @@ protected void testLoadTasksInParallel() throws Exception {
do {
retry = false;
try {
- taskList = metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities();
+ taskList =
+ metaStoreManager
+ .loadTasks(callCtx, executorId, 5, txnPerTask)
+ .getEntities();
taskList.stream().map(PolarisBaseEntity::getName).forEach(taskNames::add);
} catch (RetryOnConcurrencyException e) {
retry = true;
@@ -442,6 +449,143 @@ protected void testLoadTasksInParallel() throws Exception {
.allSatisfy((k, v) -> Assertions.assertThat(v).isEqualTo(1));
}
+ @Test
+ protected void testLoadTasksOnCompensation() {
+ for (int i = 0; i < 20; i++) {
+ polarisTestMetaStoreManager.createEntity(
+ null, PolarisEntityType.TASK, PolarisEntitySubType.NULL_SUBTYPE, "task_" + i);
+ }
+ String executorId = "testExecutor_abc";
+ PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager;
+ PolarisCallContext callCtx = polarisTestMetaStoreManager.polarisCallContext;
+
+ // First round of tasks are loaded as before, mock usual scheduling process
+ List taskList =
+ metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities();
+ Assertions.assertThat(taskList)
+ .isNotNull()
+ .isNotEmpty()
+ .hasSize(5)
+ .allSatisfy(
+ entry ->
+ Assertions.assertThat(entry)
+ .extracting(
+ e ->
+ PolarisObjectMapperUtil.deserializeProperties(
+ callCtx, e.getProperties()))
+ .asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
+ .containsEntry("lastAttemptExecutorId", executorId)
+ .containsEntry("attemptCount", "1"));
+ Set firstFiveTasks =
+ taskList.stream().map(PolarisBaseEntity::getName).collect(Collectors.toSet());
+
+ // Remaining tasks are loaded with perTaskTxn, mock periodic compensation
+ List remainingTaskList =
+ metaStoreManager.loadTasks(callCtx, executorId, 20, true).getEntities();
+ Assertions.assertThat(remainingTaskList)
+ .isNotNull()
+ .isNotEmpty()
+ .hasSize(15)
+ .extracting(PolarisBaseEntity::getName)
+ .noneMatch(firstFiveTasks::contains);
+
+ Set allTaskNames =
+ Stream.concat(
+ firstFiveTasks.stream(), remainingTaskList.stream().map(PolarisBaseEntity::getName))
+ .collect(Collectors.toSet());
+
+ // Try to load unfinished tasks again
+ List emtpyList =
+ metaStoreManager.loadTasks(callCtx, executorId, 20, true).getEntities();
+
+ Assertions.assertThat(emtpyList).isNotNull().isEmpty();
+
+ // all the tasks are unassigned. Fetch them all
+ timeSource.add(Duration.ofMinutes(10));
+ List allTasks =
+ metaStoreManager.loadTasks(callCtx, executorId, 20, true).getEntities();
+
+ Assertions.assertThat(allTasks)
+ .isNotNull()
+ .isNotEmpty()
+ .hasSize(20)
+ .extracting(PolarisBaseEntity::getName)
+ .allMatch(allTaskNames::contains);
+
+ // drop all the tasks. Skip the clock forward and fetch. empty list expected
+ allTasks.forEach(
+ entity -> metaStoreManager.dropEntityIfExists(callCtx, null, entity, Map.of(), false));
+ timeSource.add(Duration.ofMinutes(10));
+
+ List finalList =
+ metaStoreManager.loadTasks(callCtx, executorId, 20, true).getEntities();
+
+ Assertions.assertThat(finalList).isNotNull().isEmpty();
+ }
+
+ @Test
+ protected void testLoadTasksOnStartupRecovery() {
+ for (int i = 0; i < 20; i++) {
+ polarisTestMetaStoreManager.createEntity(
+ null, PolarisEntityType.TASK, PolarisEntitySubType.NULL_SUBTYPE, "task_" + i);
+ }
+ String firstExecutorId = "testExecutor_1";
+ PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager;
+ PolarisCallContext callCtx = polarisTestMetaStoreManager.polarisCallContext;
+
+ // First round of tasks are loaded as before, mock usual scheduling process
+ List firstRoundtaskList =
+ metaStoreManager.loadTasks(callCtx, firstExecutorId, 20).getEntities();
+ Assertions.assertThat(firstRoundtaskList)
+ .isNotNull()
+ .isNotEmpty()
+ .hasSize(20)
+ .allSatisfy(
+ entry ->
+ Assertions.assertThat(entry)
+ .extracting(
+ e ->
+ PolarisObjectMapperUtil.deserializeProperties(
+ callCtx, e.getProperties()))
+ .asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
+ .containsEntry("lastAttemptExecutorId", firstExecutorId)
+ .containsEntry("attemptCount", "1"));
+ Set firstRoundTask =
+ firstRoundtaskList.stream().map(PolarisBaseEntity::getName).collect(Collectors.toSet());
+
+ // Mock service dies, try with another executor and load all tasks
+ timeSource.add(Duration.ofMinutes(10));
+ String secondExecutorId = "testExecutor_2";
+ List secondRoundTaskList =
+ metaStoreManager.loadTasks(callCtx, secondExecutorId, 20, true).getEntities();
+ Assertions.assertThat(secondRoundTaskList)
+ .isNotNull()
+ .isNotEmpty()
+ .hasSize(20)
+ .allSatisfy(
+ entry ->
+ Assertions.assertThat(entry)
+ .extracting(
+ e ->
+ PolarisObjectMapperUtil.deserializeProperties(
+ callCtx, e.getProperties()))
+ .asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
+ .containsEntry("lastAttemptExecutorId", secondExecutorId)
+ .containsEntry("attemptCount", "2"))
+ .extracting(PolarisBaseEntity::getName)
+ .allMatch(firstRoundTask::contains);
+
+ // drop all the tasks. Skip the clock forward and fetch. empty list expected
+ secondRoundTaskList.forEach(
+ entity -> metaStoreManager.dropEntityIfExists(callCtx, null, entity, Map.of(), false));
+ timeSource.add(Duration.ofMinutes(10));
+
+ List finalList =
+ metaStoreManager.loadTasks(callCtx, secondExecutorId, 20, true).getEntities();
+
+ Assertions.assertThat(finalList).isNotNull().isEmpty();
+ }
+
/** Test generateNewEntityId() function that generates unique ids by creating Tasks in parallel */
@Test
protected void testCreateTasksInParallel() throws Exception {