KAFKA-17109: implement exponential backoff for state directory lock (#17116)

This PR implements exponential backoff for state directory lock to increase the time between two consecutive attempts of acquiring the lock.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Alieh Saeedi 2024-09-11 13:26:19 +02:00 committed by GitHub
parent b4e1deb43a
commit d04f534892
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 89 additions and 13 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
@ -97,6 +98,8 @@ public class StateDirectory implements AutoCloseable {
private final HashMap<TaskId, Thread> lockedTasksToOwner = new HashMap<>();
private final HashMap<TaskId, BackoffRecord> lockedTasksToBackoffRecord = new HashMap<>();
private FileChannel stateDirLockChannel;
private FileLock stateDirLock;
@ -353,9 +356,11 @@ public class StateDirectory implements AutoCloseable {
if (lockOwner.equals(Thread.currentThread())) {
log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId);
// we already own the lock
lockedTasksToBackoffRecord.remove(taskId);
return true;
} else {
// another thread owns the lock
updateOrCreateBackoffRecord(taskId, System.currentTimeMillis());
return false;
}
} else if (!stateDir.exists()) {
@ -369,6 +374,18 @@ public class StateDirectory implements AutoCloseable {
}
}
public boolean canTryLock(final TaskId taskId, final long nowMs) {
return !lockedTasksToBackoffRecord.containsKey(taskId) || lockedTasksToBackoffRecord.get(taskId).canAttempt(nowMs);
}
private void updateOrCreateBackoffRecord(final TaskId taskId, final long nowMs) {
if (lockedTasksToBackoffRecord.containsKey(taskId)) {
lockedTasksToBackoffRecord.get(taskId).recordAttempt(nowMs);
} else {
lockedTasksToBackoffRecord.put(taskId, new BackoffRecord(nowMs));
}
}
/**
* Unlock the state directory for the given {@link TaskId}.
*/
@ -681,4 +698,24 @@ public class StateDirectory implements AutoCloseable {
}
}
public static class BackoffRecord {
private long attempts;
private long lastAttemptMs;
private static final ExponentialBackoff EXPONENTIAL_BACKOFF = new ExponentialBackoff(1, 2, 10000, 0.5);
public BackoffRecord(final long nowMs) {
this.attempts = 1;
this.lastAttemptMs = nowMs;
}
public void recordAttempt(final long nowMs) {
this.attempts++;
this.lastAttemptMs = nowMs;
}
public boolean canAttempt(final long nowMs) {
return nowMs - lastAttemptMs >= EXPONENTIAL_BACKOFF.backoff(attempts);
}
}
}

View File

@ -93,9 +93,11 @@ final class StateManagerUtil {
}
final TaskId id = stateMgr.taskId();
if (!stateDirectory.lock(id)) {
if (!stateDirectory.canTryLock(id, System.currentTimeMillis())) {
log.trace("Task {} is still not allowed to retry acquiring the state directory lock", id);
} else if (!stateDirectory.lock(id)) {
throw new LockException(String.format("%sFailed to lock the state directory for task %s", logPrefix, id));
}
} else {
log.debug("Acquired state directory lock");
final boolean storeDirsEmpty = stateDirectory.directoryForTaskIsEmpty(id);
@ -109,6 +111,7 @@ final class StateManagerUtil {
stateMgr.initializeStoreOffsetsFromCheckpoint(storeDirsEmpty);
log.debug("Initialized state stores");
}
}
/**
* @throws ProcessorStateException if there is an error while closing the state manager

View File

@ -81,6 +81,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@ -180,6 +181,7 @@ public class StandbyTaskTest {
@Test
public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException {
stateDirectory = mock(StateDirectory.class);
when(stateDirectory.canTryLock(any(), anyLong())).thenReturn(true);
when(stateDirectory.lock(taskId)).thenReturn(false);
when(stateManager.taskType()).thenReturn(TaskType.STANDBY);

View File

@ -444,6 +444,22 @@ public class StateDirectoryTest {
assertFalse(directory.lock(taskId));
}
@Test
public void shouldBackoffRetryIfStateDirLockedByAnotherThread() throws Exception {
final TaskId taskId = new TaskId(0, 0);
final Thread thread = new Thread(() -> directory.lock(taskId));
thread.start();
thread.join(30000);
assertTrue(directory.canTryLock(taskId, System.currentTimeMillis()));
assertFalse(directory.lock(taskId));
assertFalse(directory.lock(taskId));
assertFalse(directory.lock(taskId));
// after 3 unsuccessful retries, backoff time is > 0
assertFalse(directory.canTryLock(taskId, System.currentTimeMillis()));
}
@Test
public void shouldNotUnLockStateDirLockedByAnotherThread() throws Exception {
final TaskId taskId = new TaskId(0, 0);

View File

@ -43,8 +43,11 @@ import java.util.List;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mockStatic;
@ -85,6 +88,7 @@ public class StateManagerUtilTest {
public void testRegisterStateStoreFailToLockStateDirectory() {
when(topology.stateStores()).thenReturn(singletonList(new MockKeyValueStore("store", false)));
when(stateManager.taskId()).thenReturn(taskId);
when(stateDirectory.canTryLock(any(), anyLong())).thenReturn(true);
when(stateDirectory.lock(taskId)).thenReturn(false);
final LockException thrown = assertThrows(LockException.class,
@ -94,6 +98,17 @@ public class StateManagerUtilTest {
assertEquals("logPrefix:Failed to lock the state directory for task 0_0", thrown.getMessage());
}
@Test
public void testRegisterStateStoreWhenTryLockIsNotAllowed() {
when(topology.stateStores()).thenReturn(singletonList(new MockKeyValueStore("store", false)));
when(stateManager.taskId()).thenReturn(taskId);
when(stateDirectory.canTryLock(any(), anyLong())).thenReturn(false);
assertDoesNotThrow(() -> StateManagerUtil.registerStateStores(logger, "logPrefix:",
topology, stateManager, stateDirectory, processorContext));
}
@Test
public void testRegisterStateStores() {
final MockKeyValueStore store1 = new MockKeyValueStore("store1", false);
@ -102,6 +117,7 @@ public class StateManagerUtilTest {
final InOrder inOrder = inOrder(stateManager);
when(topology.stateStores()).thenReturn(stateStores);
when(stateManager.taskId()).thenReturn(taskId);
when(stateDirectory.canTryLock(any(), anyLong())).thenReturn(true);
when(stateDirectory.lock(taskId)).thenReturn(true);
when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
when(topology.stateStores()).thenReturn(stateStores);

View File

@ -134,6 +134,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
@ -315,6 +316,7 @@ public class StreamTaskTest {
// Clean up state directory created as part of setup
stateDirectory.close();
stateDirectory = mock(StateDirectory.class);
when(stateDirectory.canTryLock(any(), anyLong())).thenReturn(true);
when(stateDirectory.lock(taskId)).thenReturn(false);
task = createStatefulTask(createConfig("100"), false);