From d04f53489256cf19e7faeacfe325f7527f796cf0 Mon Sep 17 00:00:00 2001 From: Alieh Saeedi <107070585+aliehsaeedii@users.noreply.github.com> Date: Wed, 11 Sep 2024 13:26:19 +0200 Subject: [PATCH] KAFKA-17109: implement exponential backoff for state directory lock (#17116) This PR implements exponential backoff for state directory lock to increase the time between two consecutive attempts of acquiring the lock. Reviewers: Lucas Brutschy --- .../processor/internals/StateDirectory.java | 37 +++++++++++++++++++ .../processor/internals/StateManagerUtil.java | 29 ++++++++------- .../processor/internals/StandbyTaskTest.java | 2 + .../internals/StateDirectoryTest.java | 16 ++++++++ .../internals/StateManagerUtilTest.java | 16 ++++++++ .../processor/internals/StreamTaskTest.java | 2 + 6 files changed, 89 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 9336d6874ba..9261835690c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.utils.ExponentialBackoff; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; @@ -97,6 +98,8 @@ public class StateDirectory implements AutoCloseable { private final HashMap lockedTasksToOwner = new HashMap<>(); + private final HashMap lockedTasksToBackoffRecord = new HashMap<>(); + private FileChannel stateDirLockChannel; private FileLock stateDirLock; @@ -353,9 +356,11 @@ public class StateDirectory implements AutoCloseable { if (lockOwner.equals(Thread.currentThread())) { log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId); // we already own the lock + lockedTasksToBackoffRecord.remove(taskId); return true; } else { // another thread owns the lock + updateOrCreateBackoffRecord(taskId, System.currentTimeMillis()); return false; } } else if (!stateDir.exists()) { @@ -369,6 +374,18 @@ public class StateDirectory implements AutoCloseable { } } + public boolean canTryLock(final TaskId taskId, final long nowMs) { + return !lockedTasksToBackoffRecord.containsKey(taskId) || lockedTasksToBackoffRecord.get(taskId).canAttempt(nowMs); + } + + private void updateOrCreateBackoffRecord(final TaskId taskId, final long nowMs) { + if (lockedTasksToBackoffRecord.containsKey(taskId)) { + lockedTasksToBackoffRecord.get(taskId).recordAttempt(nowMs); + } else { + lockedTasksToBackoffRecord.put(taskId, new BackoffRecord(nowMs)); + } + } + /** * Unlock the state directory for the given {@link TaskId}. */ @@ -681,4 +698,24 @@ public class StateDirectory implements AutoCloseable { } } + public static class BackoffRecord { + private long attempts; + private long lastAttemptMs; + private static final ExponentialBackoff EXPONENTIAL_BACKOFF = new ExponentialBackoff(1, 2, 10000, 0.5); + + + public BackoffRecord(final long nowMs) { + this.attempts = 1; + this.lastAttemptMs = nowMs; + } + + public void recordAttempt(final long nowMs) { + this.attempts++; + this.lastAttemptMs = nowMs; + } + + public boolean canAttempt(final long nowMs) { + return nowMs - lastAttemptMs >= EXPONENTIAL_BACKOFF.backoff(attempts); + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java index 4fff7b600fc..3e4337c0b1b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java @@ -93,21 +93,24 @@ final class StateManagerUtil { } final TaskId id = stateMgr.taskId(); - if (!stateDirectory.lock(id)) { + if (!stateDirectory.canTryLock(id, System.currentTimeMillis())) { + log.trace("Task {} is still not allowed to retry acquiring the state directory lock", id); + } else if (!stateDirectory.lock(id)) { throw new LockException(String.format("%sFailed to lock the state directory for task %s", logPrefix, id)); + } else { + log.debug("Acquired state directory lock"); + + final boolean storeDirsEmpty = stateDirectory.directoryForTaskIsEmpty(id); + + stateMgr.registerStateStores(topology.stateStores(), processorContext); + log.debug("Registered state stores"); + + // We should only load checkpoint AFTER the corresponding state directory lock has been acquired and + // the state stores have been registered; we should not try to load at the state manager construction time. + // See https://issues.apache.org/jira/browse/KAFKA-8574 + stateMgr.initializeStoreOffsetsFromCheckpoint(storeDirsEmpty); + log.debug("Initialized state stores"); } - log.debug("Acquired state directory lock"); - - final boolean storeDirsEmpty = stateDirectory.directoryForTaskIsEmpty(id); - - stateMgr.registerStateStores(topology.stateStores(), processorContext); - log.debug("Registered state stores"); - - // We should only load checkpoint AFTER the corresponding state directory lock has been acquired and - // the state stores have been registered; we should not try to load at the state manager construction time. - // See https://issues.apache.org/jira/browse/KAFKA-8574 - stateMgr.initializeStoreOffsetsFromCheckpoint(storeDirsEmpty); - log.debug("Initialized state stores"); } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 1d49b3b40b9..4b8fd63eff0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -81,6 +81,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -180,6 +181,7 @@ public class StandbyTaskTest { @Test public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException { stateDirectory = mock(StateDirectory.class); + when(stateDirectory.canTryLock(any(), anyLong())).thenReturn(true); when(stateDirectory.lock(taskId)).thenReturn(false); when(stateManager.taskType()).thenReturn(TaskType.STANDBY); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index 6827c9a4319..153e4ed63d8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -444,6 +444,22 @@ public class StateDirectoryTest { assertFalse(directory.lock(taskId)); } + @Test + public void shouldBackoffRetryIfStateDirLockedByAnotherThread() throws Exception { + final TaskId taskId = new TaskId(0, 0); + final Thread thread = new Thread(() -> directory.lock(taskId)); + thread.start(); + thread.join(30000); + + assertTrue(directory.canTryLock(taskId, System.currentTimeMillis())); + + assertFalse(directory.lock(taskId)); + assertFalse(directory.lock(taskId)); + assertFalse(directory.lock(taskId)); + // after 3 unsuccessful retries, backoff time is > 0 + assertFalse(directory.canTryLock(taskId, System.currentTimeMillis())); + } + @Test public void shouldNotUnLockStateDirLockedByAnotherThread() throws Exception { final TaskId taskId = new TaskId(0, 0); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java index f65945fabbb..b484e395772 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java @@ -43,8 +43,11 @@ import java.util.List; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mockStatic; @@ -85,6 +88,7 @@ public class StateManagerUtilTest { public void testRegisterStateStoreFailToLockStateDirectory() { when(topology.stateStores()).thenReturn(singletonList(new MockKeyValueStore("store", false))); when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.canTryLock(any(), anyLong())).thenReturn(true); when(stateDirectory.lock(taskId)).thenReturn(false); final LockException thrown = assertThrows(LockException.class, @@ -94,6 +98,17 @@ public class StateManagerUtilTest { assertEquals("logPrefix:Failed to lock the state directory for task 0_0", thrown.getMessage()); } + @Test + public void testRegisterStateStoreWhenTryLockIsNotAllowed() { + when(topology.stateStores()).thenReturn(singletonList(new MockKeyValueStore("store", false))); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.canTryLock(any(), anyLong())).thenReturn(false); + + assertDoesNotThrow(() -> StateManagerUtil.registerStateStores(logger, "logPrefix:", + topology, stateManager, stateDirectory, processorContext)); + + } + @Test public void testRegisterStateStores() { final MockKeyValueStore store1 = new MockKeyValueStore("store1", false); @@ -102,6 +117,7 @@ public class StateManagerUtilTest { final InOrder inOrder = inOrder(stateManager); when(topology.stateStores()).thenReturn(stateStores); when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.canTryLock(any(), anyLong())).thenReturn(true); when(stateDirectory.lock(taskId)).thenReturn(true); when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true); when(topology.stateStores()).thenReturn(stateStores); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 123334d561b..c227776ef26 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -134,6 +134,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -315,6 +316,7 @@ public class StreamTaskTest { // Clean up state directory created as part of setup stateDirectory.close(); stateDirectory = mock(StateDirectory.class); + when(stateDirectory.canTryLock(any(), anyLong())).thenReturn(true); when(stateDirectory.lock(taskId)).thenReturn(false); task = createStatefulTask(createConfig("100"), false);