Skip to content

Support retrying non-finished async tasks on startup and periodically #1585

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

danielhumanmod
Copy link
Contributor

@danielhumanmod danielhumanmod commented May 14, 2025

Fix #774

Context

Polaris uses async tasks to perform operations such as table and manifest file cleanup. These tasks are executed asynchronously in a separate thread within the same JVM, and retries are handled inline within the task execution. However, this mechanism does not guarantee eventual execution in the following cases:

  • The task fails repeatedly and hits the maximum retry limit.
  • The service crashes or shuts down before retrying.

Implementation Plan

Stage 1: Potential improvement - #1523
Introduce per-task transactional leasing in the metastore layer via loadTasks(...)

Stage 2 (Current PR):
Persist failed tasks and introduce a retry mechanism triggered during Polaris startup and via periodic background checks, changes included:

  1. Metastore Layer:
    • Exposes a new API getMetaStoreManagerMap
    • Ensures LAST_ATTEMPT_START_TIME set for each task entity creation, which is important for time-out filtering when loadTasks() from metastore, so that prevent multiple executors from picking the same task
  2. TaskRecoveryManager: New class responsible for task recovery logic, including:
    • Constructing executionPolarisCallContext
    • Loading tasks from metastore
    • Triggering task execution
  3. QuarkusTaskExecutorImpl: Hook into application lifecycle to initiate task recovery.
  4. Task Retry Strategy: Failed tasks remain persisted in the metastore and are retried by the recovery manager.
  5. Tests: Adjusted existing tests and added new coverage for recovery behavior.

Recommended Review Order

  1. Metastore Layer related code
  2. TaskRecoveryManager
  3. QuarkusTaskExecutorImpl and TaskExecutorImpl
  4. Task cleanup handlers
  5. Tests

@@ -152,6 +156,7 @@ public void testTableCleanup() throws IOException {

handler.handleTask(task, callContext);

timeSource.add(Duration.ofMinutes(10));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, task entity might miss LAST_ATTEMPT_START_TIME prop so loading tasks without time-out can success; After complete each task entity with this property, we need to manipulate time to make loadTasks works

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this further - I'm not sure why the tests need this 10m jump? Is it so that tasks are "recovered" by the Quarkus Scheduled method?

@danielhumanmod danielhumanmod changed the title Support more reliable async task retry to guarantee eventual execution (2/2) – Task Executor Support retrying non-finished async tasks on startup and periodically May 18, 2025
@@ -172,6 +172,11 @@ public Map<String, BaseResult> purgeRealms(Iterable<String> realms) {
return Map.copyOf(results);
}

@Override
public Map<String, PolarisMetaStoreManager> getMetaStoreManagerMap() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make this a bit more defensively-coded, I might recommend making this into a iterator of Map.Entry objects, given that this is a public method and we wouldn't want any code path to be able to modify this mapping?

}

private void addTaskLocation(TaskEntity task) {
Map<String, String> internalPropertiesAsMap = new HashMap<>(task.getInternalPropertiesAsMap());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addInternalProperty

try {
ManifestReader<DataFile> dataFiles = ManifestFiles.read(manifestFile, fileIO);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason behind this change?

@@ -193,6 +198,9 @@ private Stream<TaskEntity> getManifestTaskStream(
.withData(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableEntity.getTableIdentifier(), TaskUtils.encodeManifestFile(mf)))
.withLastAttemptExecutorId(executorId)
.withAttemptCount(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we assume this?

@@ -235,6 +247,9 @@ private Stream<TaskEntity> getMetadataTaskStream(
.withData(
new BatchFileCleanupTaskHandler.BatchFileCleanupTask(
tableEntity.getTableIdentifier(), metadataBatch))
.withLastAttemptExecutorId(executorId)
.withAttemptCount(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto as above.

PolarisCallContext polarisCallContext =
new PolarisCallContext(
metastore, new PolarisDefaultDiagServiceImpl(), configurationStore, clock);
EntitiesResult entitiesResult =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I'm understanding the logic here: we are asking for 20 tasks here - but what if there are more than 20 tasks that need recovery?

@@ -152,6 +156,7 @@ public void testTableCleanup() throws IOException {

handler.handleTask(task, callContext);

timeSource.add(Duration.ofMinutes(10));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this further - I'm not sure why the tests need this 10m jump? Is it so that tasks are "recovered" by the Quarkus Scheduled method?

tableCleanupTaskHandler.handleTask(task, callCtx);

// Step 3: Verify that the generated child tasks were registered, ATTEMPT_COUNT = 2
timeSource.add(Duration.ofMinutes(10));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I, personally, found this very hard to follow - even with the comments. I would highly recommend making the comments much more verbose here to allow the full flow of logic (what is happening with which task and why) to be communicated to a reader who may not be an expert at this particular type of task or tasks in general.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Task handling is incomplete
2 participants