From c034388a0a528bb7cef9aff6dd9c08a3db1e1ad3 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Tue, 8 Nov 2022 17:55:37 +0100 Subject: [PATCH] KAFKA-14299: Avoid allocation & synchronization overhead in StreamThread loop (#12808) The state updater code path introduced allocation and synchronization overhead by performing relatively heavy operations in every iteration of the StreamThread loop. This includes various allocations and acquiring locks for handling `removedTasks` and `failedTasks`, even if the corresponding queues are empty. This change introduces `hasRemovedTasks` and `hasExceptionsAndFailedTasks` in the `StateUpdater` interface that can be used to skip over any allocation or synchronization. The new methods do not require synchronization or memory allocation. This change increases throughput by ~15% in one benchmark. We extend existing unit tests to cover the slightly modified behavior. Reviewer: Bruno Cadonna --- .../processor/internals/DefaultStateUpdater.java | 9 +++++++++ .../processor/internals/StateUpdater.java | 16 ++++++++++++++++ .../streams/processor/internals/TaskManager.java | 8 ++++++-- .../internals/DefaultStateUpdaterTest.java | 16 ++++++++++++++-- .../processor/internals/TaskManagerTest.java | 8 ++++++++ 5 files changed, 53 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index cae9ae3305f..de193eff2e7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -540,6 +540,10 @@ public class DefaultStateUpdater implements StateUpdater { return new HashSet<>(result); } + public boolean hasRemovedTasks() { + return !removedTasks.isEmpty(); + } + @Override public List drainExceptionsAndFailedTasks() { final List result = new ArrayList<>(); @@ -547,6 +551,11 @@ public class DefaultStateUpdater implements StateUpdater { return result; } + @Override + public boolean hasExceptionsAndFailedTasks() { + return !exceptionsAndFailedTasks.isEmpty(); + } + public Set getUpdatingStandbyTasks() { return stateUpdaterThread != null ? Collections.unmodifiableSet(new HashSet<>(stateUpdaterThread.getUpdatingStandbyTasks())) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java index 94bd5234637..10ac51874d2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java @@ -120,6 +120,14 @@ public interface StateUpdater { */ Set drainRemovedTasks(); + /** + * Checks if the state updater has any tasks that should be removed and returned to the StreamThread + * using `drainRemovedTasks`. + * + * @return true if a subsequent call to `drainRemovedTasks` would return a non-empty collection. + */ + boolean hasRemovedTasks(); + /** * Drains the failed tasks and the corresponding exceptions. * @@ -129,6 +137,14 @@ public interface StateUpdater { */ List drainExceptionsAndFailedTasks(); + /** + * Checks if the state updater has any failed tasks that should be returned to the StreamThread + * using `drainExceptionsAndFailedTasks`. + * + * @return true if a subsequent call to `drainExceptionsAndFailedTasks` would return a non-empty collection. + */ + boolean hasExceptionsAndFailedTasks(); + /** * Gets all tasks that are managed by the state updater. * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 7b1ce52ae7c..e5551bd6840 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -731,8 +731,12 @@ public class TaskManager { public boolean checkStateUpdater(final long now, final java.util.function.Consumer> offsetResetter) { addTasksToStateUpdater(); - handleExceptionsFromStateUpdater(); - handleRemovedTasksFromStateUpdater(); + if (stateUpdater.hasExceptionsAndFailedTasks()) { + handleExceptionsFromStateUpdater(); + } + if (stateUpdater.hasRemovedTasks()) { + handleRemovedTasksFromStateUpdater(); + } if (stateUpdater.restoresActiveTasks()) { handleRestoredTasksFromStateUpdater(now, offsetResetter); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index a689ccc1a4d..9f1e642a17e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -1148,6 +1148,7 @@ class DefaultStateUpdaterTest { @Test public void shouldDrainRemovedTasks() throws Exception { + assertFalse(stateUpdater.hasRemovedTasks()); assertTrue(stateUpdater.drainRemovedTasks().isEmpty()); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.allChangelogsCompleted()).thenReturn(false); @@ -1285,6 +1286,7 @@ class DefaultStateUpdaterTest { @Test public void shouldDrainFailedTasksAndExceptions() throws Exception { + assertFalse(stateUpdater.hasExceptionsAndFailedTasks()); assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty()); final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); @@ -1666,13 +1668,18 @@ class DefaultStateUpdaterTest { final Set removedTasks = new HashSet<>(); waitForCondition( () -> { - removedTasks.addAll(stateUpdater.drainRemovedTasks()); + if (stateUpdater.hasRemovedTasks()) { + final Set drainedTasks = stateUpdater.drainRemovedTasks(); + assertFalse(drainedTasks.isEmpty()); + removedTasks.addAll(drainedTasks); + } return removedTasks.containsAll(mkSet(tasks)) && removedTasks.size() == expectedRemovedTasks.size(); }, VERIFICATION_TIMEOUT, "Did not get all restored active task within the given timeout!" ); + assertFalse(stateUpdater.hasRemovedTasks()); assertTrue(stateUpdater.drainRemovedTasks().isEmpty()); } @@ -1713,13 +1720,18 @@ class DefaultStateUpdaterTest { final List failedTasks = new ArrayList<>(); waitForCondition( () -> { - failedTasks.addAll(stateUpdater.drainExceptionsAndFailedTasks()); + if (stateUpdater.hasExceptionsAndFailedTasks()) { + final List exceptionAndTasks = stateUpdater.drainExceptionsAndFailedTasks(); + assertFalse(exceptionAndTasks.isEmpty()); + failedTasks.addAll(exceptionAndTasks); + } return failedTasks.containsAll(expectedExceptionAndTasks) && failedTasks.size() == expectedExceptionAndTasks.size(); }, VERIFICATION_TIMEOUT, "Did not get all exceptions and failed tasks within the given timeout!" ); + assertFalse(stateUpdater.hasExceptionsAndFailedTasks()); assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty()); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 1220630dd13..023f54dbea4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -782,6 +782,7 @@ public class TaskManagerTest { .withInputPartitions(taskId00Partitions).build(); final StreamTask task01Converted = statefulTask(taskId01, taskId01Partitions) .withInputPartitions(taskId01Partitions).build(); + when(stateUpdater.hasRemovedTasks()).thenReturn(true); when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01)); final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions); @@ -814,6 +815,7 @@ public class TaskManagerTest { final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions) .withInputPartitions(taskId01Partitions) .inState(State.RUNNING).build(); + when(stateUpdater.hasRemovedTasks()).thenReturn(true); when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01)); final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.removePendingTaskToRecycle(any())).thenReturn(null); @@ -841,6 +843,7 @@ public class TaskManagerTest { final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions) .withInputPartitions(taskId01Partitions) .inState(State.RUNNING).build(); + when(stateUpdater.hasRemovedTasks()).thenReturn(true); when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01)); final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.removePendingTaskToRecycle(any())).thenReturn(null); @@ -870,6 +873,7 @@ public class TaskManagerTest { when(tasks.removePendingTaskToRecycle(statefulTask.id())).thenReturn(null); when(tasks.removePendingTaskToUpdateInputPartitions(statefulTask.id())).thenReturn(null); when(tasks.removePendingActiveTaskToSuspend(statefulTask.id())).thenReturn(true); + when(stateUpdater.hasRemovedTasks()).thenReturn(true); when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask)); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); replay(consumer); @@ -896,6 +900,7 @@ public class TaskManagerTest { final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions) .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); + when(stateUpdater.hasRemovedTasks()).thenReturn(true); when(stateUpdater.drainRemovedTasks()) .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions)); when(stateUpdater.restoresActiveTasks()).thenReturn(true); @@ -1366,6 +1371,7 @@ public class TaskManagerTest { Collections.singleton(statefulTask), exception ); + when(stateUpdater.hasExceptionsAndFailedTasks()).thenReturn(true); when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks)); final TasksRegistry tasks = mock(TasksRegistry.class); @@ -1390,6 +1396,7 @@ public class TaskManagerTest { Collections.singleton(statefulTask), exception ); + when(stateUpdater.hasExceptionsAndFailedTasks()).thenReturn(true); when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks)); final TasksRegistry tasks = mock(TasksRegistry.class); @@ -1421,6 +1428,7 @@ public class TaskManagerTest { Collections.singleton(statefulTask1), new TaskCorruptedException(Collections.singleton(taskId01)) ); + when(stateUpdater.hasExceptionsAndFailedTasks()).thenReturn(true); when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(exceptionAndTasks0, exceptionAndTasks1)); final TasksRegistry tasks = mock(TasksRegistry.class);