KAFKA-14299: Avoid allocation & synchronization overhead in StreamThread loop (#12808)

The state updater code path introduced allocation and synchronization
overhead by performing relatively heavy operations in every iteration of
the StreamThread loop. This includes various allocations and acquiring
locks for handling `removedTasks` and `failedTasks`, even if the
corresponding queues are empty.

This change introduces `hasRemovedTasks` and
`hasExceptionsAndFailedTasks` in the `StateUpdater` interface that
can be used to skip over any allocation or synchronization. The new
methods do not require synchronization or memory allocation.

This change increases throughput by ~15% in one benchmark.

We extend existing unit tests to cover the slightly modified
behavior.

Reviewer: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
Lucas Brutschy 2022-11-08 17:55:37 +01:00 committed by GitHub
parent ac3a3687a0
commit c034388a0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 53 additions and 4 deletions

View File

@ -540,6 +540,10 @@ public class DefaultStateUpdater implements StateUpdater {
return new HashSet<>(result); return new HashSet<>(result);
} }
public boolean hasRemovedTasks() {
return !removedTasks.isEmpty();
}
@Override @Override
public List<ExceptionAndTasks> drainExceptionsAndFailedTasks() { public List<ExceptionAndTasks> drainExceptionsAndFailedTasks() {
final List<ExceptionAndTasks> result = new ArrayList<>(); final List<ExceptionAndTasks> result = new ArrayList<>();
@ -547,6 +551,11 @@ public class DefaultStateUpdater implements StateUpdater {
return result; return result;
} }
@Override
public boolean hasExceptionsAndFailedTasks() {
return !exceptionsAndFailedTasks.isEmpty();
}
public Set<StandbyTask> getUpdatingStandbyTasks() { public Set<StandbyTask> getUpdatingStandbyTasks() {
return stateUpdaterThread != null return stateUpdaterThread != null
? Collections.unmodifiableSet(new HashSet<>(stateUpdaterThread.getUpdatingStandbyTasks())) ? Collections.unmodifiableSet(new HashSet<>(stateUpdaterThread.getUpdatingStandbyTasks()))

View File

@ -120,6 +120,14 @@ public interface StateUpdater {
*/ */
Set<Task> drainRemovedTasks(); Set<Task> drainRemovedTasks();
/**
* Checks if the state updater has any tasks that should be removed and returned to the StreamThread
* using `drainRemovedTasks`.
*
* @return true if a subsequent call to `drainRemovedTasks` would return a non-empty collection.
*/
boolean hasRemovedTasks();
/** /**
* Drains the failed tasks and the corresponding exceptions. * Drains the failed tasks and the corresponding exceptions.
* *
@ -129,6 +137,14 @@ public interface StateUpdater {
*/ */
List<ExceptionAndTasks> drainExceptionsAndFailedTasks(); List<ExceptionAndTasks> drainExceptionsAndFailedTasks();
/**
* Checks if the state updater has any failed tasks that should be returned to the StreamThread
* using `drainExceptionsAndFailedTasks`.
*
* @return true if a subsequent call to `drainExceptionsAndFailedTasks` would return a non-empty collection.
*/
boolean hasExceptionsAndFailedTasks();
/** /**
* Gets all tasks that are managed by the state updater. * Gets all tasks that are managed by the state updater.
* *

View File

@ -731,8 +731,12 @@ public class TaskManager {
public boolean checkStateUpdater(final long now, public boolean checkStateUpdater(final long now,
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) { final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
addTasksToStateUpdater(); addTasksToStateUpdater();
handleExceptionsFromStateUpdater(); if (stateUpdater.hasExceptionsAndFailedTasks()) {
handleRemovedTasksFromStateUpdater(); handleExceptionsFromStateUpdater();
}
if (stateUpdater.hasRemovedTasks()) {
handleRemovedTasksFromStateUpdater();
}
if (stateUpdater.restoresActiveTasks()) { if (stateUpdater.restoresActiveTasks()) {
handleRestoredTasksFromStateUpdater(now, offsetResetter); handleRestoredTasksFromStateUpdater(now, offsetResetter);
} }

View File

@ -1148,6 +1148,7 @@ class DefaultStateUpdaterTest {
@Test @Test
public void shouldDrainRemovedTasks() throws Exception { public void shouldDrainRemovedTasks() throws Exception {
assertFalse(stateUpdater.hasRemovedTasks());
assertTrue(stateUpdater.drainRemovedTasks().isEmpty()); assertTrue(stateUpdater.drainRemovedTasks().isEmpty());
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false); when(changelogReader.allChangelogsCompleted()).thenReturn(false);
@ -1285,6 +1286,7 @@ class DefaultStateUpdaterTest {
@Test @Test
public void shouldDrainFailedTasksAndExceptions() throws Exception { public void shouldDrainFailedTasksAndExceptions() throws Exception {
assertFalse(stateUpdater.hasExceptionsAndFailedTasks());
assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty()); assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
@ -1666,13 +1668,18 @@ class DefaultStateUpdaterTest {
final Set<Task> removedTasks = new HashSet<>(); final Set<Task> removedTasks = new HashSet<>();
waitForCondition( waitForCondition(
() -> { () -> {
removedTasks.addAll(stateUpdater.drainRemovedTasks()); if (stateUpdater.hasRemovedTasks()) {
final Set<Task> drainedTasks = stateUpdater.drainRemovedTasks();
assertFalse(drainedTasks.isEmpty());
removedTasks.addAll(drainedTasks);
}
return removedTasks.containsAll(mkSet(tasks)) return removedTasks.containsAll(mkSet(tasks))
&& removedTasks.size() == expectedRemovedTasks.size(); && removedTasks.size() == expectedRemovedTasks.size();
}, },
VERIFICATION_TIMEOUT, VERIFICATION_TIMEOUT,
"Did not get all restored active task within the given timeout!" "Did not get all restored active task within the given timeout!"
); );
assertFalse(stateUpdater.hasRemovedTasks());
assertTrue(stateUpdater.drainRemovedTasks().isEmpty()); assertTrue(stateUpdater.drainRemovedTasks().isEmpty());
} }
@ -1713,13 +1720,18 @@ class DefaultStateUpdaterTest {
final List<ExceptionAndTasks> failedTasks = new ArrayList<>(); final List<ExceptionAndTasks> failedTasks = new ArrayList<>();
waitForCondition( waitForCondition(
() -> { () -> {
failedTasks.addAll(stateUpdater.drainExceptionsAndFailedTasks()); if (stateUpdater.hasExceptionsAndFailedTasks()) {
final List<ExceptionAndTasks> exceptionAndTasks = stateUpdater.drainExceptionsAndFailedTasks();
assertFalse(exceptionAndTasks.isEmpty());
failedTasks.addAll(exceptionAndTasks);
}
return failedTasks.containsAll(expectedExceptionAndTasks) return failedTasks.containsAll(expectedExceptionAndTasks)
&& failedTasks.size() == expectedExceptionAndTasks.size(); && failedTasks.size() == expectedExceptionAndTasks.size();
}, },
VERIFICATION_TIMEOUT, VERIFICATION_TIMEOUT,
"Did not get all exceptions and failed tasks within the given timeout!" "Did not get all exceptions and failed tasks within the given timeout!"
); );
assertFalse(stateUpdater.hasExceptionsAndFailedTasks());
assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty()); assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
} }
} }

View File

@ -782,6 +782,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId00Partitions).build(); .withInputPartitions(taskId00Partitions).build();
final StreamTask task01Converted = statefulTask(taskId01, taskId01Partitions) final StreamTask task01Converted = statefulTask(taskId01, taskId01Partitions)
.withInputPartitions(taskId01Partitions).build(); .withInputPartitions(taskId01Partitions).build();
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01)); when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01));
final TasksRegistry tasks = mock(TasksRegistry.class); final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions); when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
@ -814,6 +815,7 @@ public class TaskManagerTest {
final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions) final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions) .withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build(); .inState(State.RUNNING).build();
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01)); when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01));
final TasksRegistry tasks = mock(TasksRegistry.class); final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToRecycle(any())).thenReturn(null); when(tasks.removePendingTaskToRecycle(any())).thenReturn(null);
@ -841,6 +843,7 @@ public class TaskManagerTest {
final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions) final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions) .withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build(); .inState(State.RUNNING).build();
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01)); when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01));
final TasksRegistry tasks = mock(TasksRegistry.class); final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToRecycle(any())).thenReturn(null); when(tasks.removePendingTaskToRecycle(any())).thenReturn(null);
@ -870,6 +873,7 @@ public class TaskManagerTest {
when(tasks.removePendingTaskToRecycle(statefulTask.id())).thenReturn(null); when(tasks.removePendingTaskToRecycle(statefulTask.id())).thenReturn(null);
when(tasks.removePendingTaskToUpdateInputPartitions(statefulTask.id())).thenReturn(null); when(tasks.removePendingTaskToUpdateInputPartitions(statefulTask.id())).thenReturn(null);
when(tasks.removePendingActiveTaskToSuspend(statefulTask.id())).thenReturn(true); when(tasks.removePendingActiveTaskToSuspend(statefulTask.id())).thenReturn(true);
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask)); when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
replay(consumer); replay(consumer);
@ -896,6 +900,7 @@ public class TaskManagerTest {
final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions) final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING) .inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build(); .withInputPartitions(taskId03Partitions).build();
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()) when(stateUpdater.drainRemovedTasks())
.thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions)); .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
when(stateUpdater.restoresActiveTasks()).thenReturn(true); when(stateUpdater.restoresActiveTasks()).thenReturn(true);
@ -1366,6 +1371,7 @@ public class TaskManagerTest {
Collections.singleton(statefulTask), Collections.singleton(statefulTask),
exception exception
); );
when(stateUpdater.hasExceptionsAndFailedTasks()).thenReturn(true);
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks)); when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
final TasksRegistry tasks = mock(TasksRegistry.class); final TasksRegistry tasks = mock(TasksRegistry.class);
@ -1390,6 +1396,7 @@ public class TaskManagerTest {
Collections.singleton(statefulTask), Collections.singleton(statefulTask),
exception exception
); );
when(stateUpdater.hasExceptionsAndFailedTasks()).thenReturn(true);
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks)); when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
final TasksRegistry tasks = mock(TasksRegistry.class); final TasksRegistry tasks = mock(TasksRegistry.class);
@ -1421,6 +1428,7 @@ public class TaskManagerTest {
Collections.singleton(statefulTask1), Collections.singleton(statefulTask1),
new TaskCorruptedException(Collections.singleton(taskId01)) new TaskCorruptedException(Collections.singleton(taskId01))
); );
when(stateUpdater.hasExceptionsAndFailedTasks()).thenReturn(true);
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(exceptionAndTasks0, exceptionAndTasks1)); when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(exceptionAndTasks0, exceptionAndTasks1));
final TasksRegistry tasks = mock(TasksRegistry.class); final TasksRegistry tasks = mock(TasksRegistry.class);