Cherrypick lock backoff retry to 3.9 (#18485)

This PR cherry-picks the commits of this and this PR to 3.9.
The above-listed two PRs aim at implementing exponential back off retry for state directory lock to increase the time between two consecutive attempts of acquiring the lock.

Reviewer: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
Alieh Saeedi 2025-01-21 10:14:28 +01:00 committed by GitHub
parent 35829fddcb
commit 310867a984
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 97 additions and 11 deletions

View File

@ -363,8 +363,6 @@ public class StateDirectory implements AutoCloseable {
throw new IllegalStateException("The state directory has been deleted");
} else {
lockedTasksToOwner.put(taskId, Thread.currentThread());
// make sure the task directory actually exists, and create it if not
getOrCreateDirectoryForTask(taskId);
return true;
}
}
@ -680,5 +678,4 @@ public class StateDirectory implements AutoCloseable {
return Objects.hash(file, namedTopology);
}
}
}

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LockException;
@ -104,6 +105,8 @@ public class TaskManager {
// includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance
private final Set<TaskId> lockedTaskDirectories = new HashSet<>();
private Map<TaskId, BackoffRecord> taskIdToBackoffRecord = new HashMap<>();
private final ActiveTaskCreator activeTaskCreator;
private final StandbyTaskCreator standbyTaskCreator;
private final StateUpdater stateUpdater;
@ -1007,14 +1010,22 @@ public class TaskManager {
}
private void addTaskToStateUpdater(final Task task) {
final long nowMs = time.milliseconds();
try {
task.initializeIfNeeded();
stateUpdater.add(task);
if (canTryInitializeTask(task.id(), nowMs)) {
task.initializeIfNeeded();
taskIdToBackoffRecord.remove(task.id());
stateUpdater.add(task);
} else {
log.trace("Task {} is still not allowed to retry acquiring the state directory lock", task.id());
tasks.addPendingTasksToInit(Collections.singleton(task));
}
} catch (final LockException lockException) {
// The state directory may still be locked by another thread, when the rebalance just happened.
// Retry in the next iteration.
log.info("Encountered lock exception. Reattempting locking the state in the next iteration.", lockException);
tasks.addPendingTasksToInit(Collections.singleton(task));
updateOrCreateBackoffRecord(task.id(), nowMs);
}
}
@ -1770,7 +1781,6 @@ public class TaskManager {
return standbyTasksInTaskRegistry;
}
}
// For testing only.
int commitAll() {
return commit(tasks.allTasks());
@ -2117,4 +2127,37 @@ public class TaskManager {
void addTask(final Task task) {
tasks.addTask(task);
}
private boolean canTryInitializeTask(final TaskId taskId, final long nowMs) {
return !taskIdToBackoffRecord.containsKey(taskId) || taskIdToBackoffRecord.get(taskId).canAttempt(nowMs);
}
private void updateOrCreateBackoffRecord(final TaskId taskId, final long nowMs) {
if (taskIdToBackoffRecord.containsKey(taskId)) {
taskIdToBackoffRecord.get(taskId).recordAttempt(nowMs);
} else {
taskIdToBackoffRecord.put(taskId, new BackoffRecord(nowMs));
}
}
public static class BackoffRecord {
private long attempts;
private long lastAttemptMs;
private static final ExponentialBackoff EXPONENTIAL_BACKOFF = new ExponentialBackoff(1000, 2, 10000, 0.5);
public BackoffRecord(final long nowMs) {
this.attempts = 1;
this.lastAttemptMs = nowMs;
}
public void recordAttempt(final long nowMs) {
this.attempts++;
this.lastAttemptMs = nowMs;
}
public boolean canAttempt(final long nowMs) {
return nowMs - lastAttemptMs >= EXPONENTIAL_BACKOFF.backoff(attempts);
}
}
}

View File

@ -54,7 +54,6 @@ import org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler.ProcessingHandlerResponse;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;

View File

@ -1143,7 +1143,7 @@ public class StreamThreadTest {
topologyMetadata.buildAndRewriteTopology();
final TaskManager taskManager = new TaskManager(
null,
new MockTime(),
changelogReader,
null,
null,

View File

@ -61,7 +61,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Answers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ -203,8 +202,6 @@ public class TaskManagerTest {
private Admin adminClient;
@Mock
private ProcessorStateManager stateManager;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ProcessorStateManager.StateStoreMetadata stateStore;
final StateUpdater stateUpdater = mock(StateUpdater.class);
final DefaultTaskManager schedulingTaskManager = mock(DefaultTaskManager.class);
@ -1247,6 +1244,54 @@ public class TaskManagerTest {
verify(stateUpdater).add(task01);
}
@Test
public void shouldRetryInitializationWithBackoffWhenInitializationFails() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, task01));
doThrow(new LockException("Lock Exception!")).when(task00).initializeIfNeeded();
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
// task00 should not be initialized due to LockException, task01 should be initialized
verify(task00).initializeIfNeeded();
verify(task01).initializeIfNeeded();
verify(tasks).addPendingTasksToInit(
argThat(tasksToInit -> tasksToInit.contains(task00) && !tasksToInit.contains(task01))
);
verify(stateUpdater, never()).add(task00);
verify(stateUpdater).add(task01);
time.sleep(500);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
// task00 should not be initialized since the backoff period has not passed
verify(task00, times(1)).initializeIfNeeded();
verify(tasks, times(2)).addPendingTasksToInit(
argThat(tasksToInit -> tasksToInit.contains(task00))
);
verify(stateUpdater, never()).add(task00);
time.sleep(5000);
// task00 should call initialize since the backoff period has passed
doNothing().when(task00).initializeIfNeeded();
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(task00, times(2)).initializeIfNeeded();
verify(tasks, times(2)).addPendingTasksToInit(
argThat(tasksToInit -> tasksToInit.contains(task00))
);
verify(stateUpdater).add(task00);
}
@Test
public void shouldRethrowRuntimeExceptionInInitTaskWithStateUpdater() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
@ -1669,6 +1714,8 @@ public class TaskManagerTest {
assertEquals(mkSet(taskId00, taskId01), thrown.corruptedTasks());
assertEquals("Tasks [0_1, 0_0] are corrupted and hence need to be re-initialized", thrown.getMessage());
}
@Test
public void shouldAddSubscribedTopicsFromAssignmentToTopologyMetadata() {
final Map<TaskId, Set<TopicPartition>> activeTasksAssignment = mkMap(
mkEntry(taskId01, mkSet(t1p1)),