Kafka Streams Threading P3: TaskManager Impl (#12754)

0. Add name to task executors.
1. DefaultTaskManager implementation, for interacting with the TaskExecutors and support add/remove/lock APIs.
2. Related unit tests.
This commit is contained in:
Guozhang Wang 2022-10-14 16:10:57 -07:00 committed by GitHub
parent dfb5929665
commit 55a3a95b7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 476 additions and 5 deletions

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.DefaultStateUpdater;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.slf4j.Logger;
@ -45,7 +44,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
super(name);
final String logPrefix = String.format("%s ", name);
final LogContext logContext = new LogContext(logPrefix);
log = logContext.logger(DefaultStateUpdater.class);
log = logContext.logger(DefaultTaskExecutor.class);
}
@Override
@ -102,6 +101,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
}
private final Time time;
private final String name;
private final TaskManager taskManager;
private StreamTask currentTask = null;
@ -109,15 +109,22 @@ public class DefaultTaskExecutor implements TaskExecutor {
private CountDownLatch shutdownGate;
public DefaultTaskExecutor(final TaskManager taskManager,
final String name,
final Time time) {
this.time = time;
this.name = name;
this.taskManager = taskManager;
}
@Override
public String name() {
return name;
}
@Override
public void start() {
if (taskExecutorThread == null) {
taskExecutorThread = new TaskExecutorThread("task-executor");
taskExecutorThread = new TaskExecutorThread(name);
taskExecutorThread.start();
shutdownGate = new CountDownLatch(1);
}

View File

@ -0,0 +1,246 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.tasks;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TasksRegistry;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* An active task could only be in one of the following status:
*
* 1. It's assigned to one of the executors for processing.
* 2. It's locked for committing, removal, other manipulations etc.
* 3. Neither 1 or 2, i.e. it stays idle. This is possible if we do not have enough executors or because those tasks
* are not processable (e.g. because no records fetched) yet.
*/
public class DefaultTaskManager implements TaskManager {
private final Time time;
private final Logger log;
private final TasksRegistry tasks;
private final Lock tasksLock = new ReentrantLock();
private final List<TaskId> lockedTasks = new ArrayList<>();
private final Map<TaskId, TaskExecutor> assignedTasks = new HashMap<>();
private final List<TaskExecutor> taskExecutors;
static class DefaultTaskExecutorCreator implements TaskExecutorCreator {
@Override
public TaskExecutor create(final TaskManager taskManager, final String name, final Time time) {
return new DefaultTaskExecutor(taskManager, name, time);
}
}
public DefaultTaskManager(final Time time,
final String clientId,
final TasksRegistry tasks,
final StreamsConfig config,
final TaskExecutorCreator executorCreator) {
final String logPrefix = String.format("%s ", clientId);
final LogContext logContext = new LogContext(logPrefix);
this.log = logContext.logger(DefaultTaskManager.class);
this.time = time;
this.tasks = tasks;
final int numExecutors = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
this.taskExecutors = new ArrayList<>(numExecutors);
for (int i = 1; i <= numExecutors; i++) {
final String name = clientId + "-TaskExecutor-" + i;
this.taskExecutors.add(executorCreator.create(this, name, time));
}
}
@Override
public StreamTask assignNextTask(final TaskExecutor executor) {
return returnWithTasksLocked(() -> {
if (!taskExecutors.contains(executor)) {
throw new IllegalArgumentException("The requested executor for getting next task to assign is unrecognized");
}
// the most naive scheduling algorithm for now: give the next unlocked, unassigned, and processable task
for (final Task task : tasks.activeTasks()) {
if (!assignedTasks.containsKey(task.id()) &&
!lockedTasks.contains(task.id()) &&
((StreamTask) task).isProcessable(time.milliseconds())) {
assignedTasks.put(task.id(), executor);
log.info("Assigned {} to executor {}", task.id(), executor.name());
return (StreamTask) task;
}
}
return null;
});
}
@Override
public void unassignTask(final StreamTask task, final TaskExecutor executor) {
executeWithTasksLocked(() -> {
if (!taskExecutors.contains(executor)) {
throw new IllegalArgumentException("The requested executor for unassign task is unrecognized");
}
final TaskExecutor lockedExecutor = assignedTasks.get(task.id());
if (lockedExecutor == null || lockedExecutor != executor) {
throw new IllegalArgumentException("Task " + task.id() + " is not locked by the executor");
}
assignedTasks.remove(task.id());
log.info("Unassigned {} from executor {}", task.id(), executor.name());
});
}
@Override
public KafkaFuture<Void> lockTasks(final Set<TaskId> taskIds) {
return returnWithTasksLocked(() -> {
lockedTasks.addAll(taskIds);
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
final Set<TaskId> remainingTaskIds = new ConcurrentSkipListSet<>(taskIds);
for (final TaskId taskId : taskIds) {
final Task task = tasks.task(taskId);
if (task == null) {
throw new IllegalArgumentException("Trying to lock task " + taskId + " but it's not owned");
}
if (!task.isActive()) {
throw new IllegalArgumentException("The locking task " + taskId + " is not an active task");
}
if (assignedTasks.containsKey(taskId)) {
final KafkaFuture<StreamTask> future = assignedTasks.get(taskId).unassign();
future.whenComplete((streamTask, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
remainingTaskIds.remove(streamTask.id());
if (remainingTaskIds.isEmpty()) {
result.complete(null);
}
}
});
} else {
remainingTaskIds.remove(taskId);
if (remainingTaskIds.isEmpty()) {
result.complete(null);
}
}
}
return result;
});
}
@Override
public KafkaFuture<Void> lockAllTasks() {
return returnWithTasksLocked(() ->
lockTasks(tasks.activeTasks().stream().map(Task::id).collect(Collectors.toSet()))
);
}
@Override
public void unlockTasks(final Set<TaskId> taskIds) {
executeWithTasksLocked(() -> lockedTasks.removeAll(taskIds));
}
@Override
public void unlockAllTasks() {
executeWithTasksLocked(() -> unlockTasks(tasks.activeTasks().stream().map(Task::id).collect(Collectors.toSet())));
}
@Override
public void add(final Set<StreamTask> tasksToAdd) {
executeWithTasksLocked(() -> {
for (final StreamTask task : tasksToAdd) {
tasks.addTask(task);
}
});
log.info("Added tasks {} to the task manager to process", tasksToAdd);
}
@Override
public void remove(final TaskId taskId) {
executeWithTasksLocked(() -> {
if (assignedTasks.containsKey(taskId)) {
throw new IllegalArgumentException("The task to remove is still assigned to executors");
}
if (!lockedTasks.contains(taskId)) {
throw new IllegalArgumentException("The task to remove is not locked yet by the task manager");
}
if (!tasks.contains(taskId)) {
throw new IllegalArgumentException("The task to remove is not owned by the task manager");
}
tasks.removeTask(tasks.task(taskId));
});
log.info("Removed task {} from the task manager", taskId);
}
@Override
public Set<ReadOnlyTask> getTasks() {
return returnWithTasksLocked(() -> tasks.activeTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet()));
}
private void executeWithTasksLocked(final Runnable action) {
tasksLock.lock();
try {
action.run();
} finally {
tasksLock.unlock();
}
}
private <T> T returnWithTasksLocked(final Supplier<T> action) {
tasksLock.lock();
try {
return action.get();
} finally {
tasksLock.unlock();
}
}
}

View File

@ -25,7 +25,12 @@ import java.time.Duration;
public interface TaskExecutor {
/**
* Starts the task processor.
* @return ID name string of the task executor.
*/
String name();
/**
* Starts the task executor.
*/
void start();

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.tasks;
import org.apache.kafka.common.utils.Time;
public interface TaskExecutorCreator {
TaskExecutor create(final TaskManager taskManager, String name, Time time);
}

View File

@ -45,7 +45,7 @@ public class DefaultTaskExecutorTest {
private final StreamTask task = mock(StreamTask.class);
private final TaskManager taskManager = mock(TaskManager.class);
private final DefaultTaskExecutor taskExecutor = new DefaultTaskExecutor(taskManager, time);
private final DefaultTaskExecutor taskExecutor = new DefaultTaskExecutor(taskManager, "TaskExecutor", time);
@BeforeEach
public void setUp() {

View File

@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.tasks;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.TasksRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.Properties;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class DefaultTaskManagerTest {
private final Time time = new MockTime(1L);
private final StreamTask task = mock(StreamTask.class);
private final TasksRegistry tasks = mock(TasksRegistry.class);
private final TaskExecutor taskExecutor = mock(TaskExecutor.class);
private final StreamsConfig config = new StreamsConfig(configProps());
private final TaskManager taskManager = new DefaultTaskManager(time, "TaskManager", tasks, config,
(taskManager, name, time) -> taskExecutor);
private Properties configProps() {
return mkObjectProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2)
));
}
@BeforeEach
public void setUp() {
when(task.id()).thenReturn(new TaskId(0, 0, "A"));
when(task.isProcessable(anyLong())).thenReturn(true);
when(task.isActive()).thenReturn(true);
}
@Test
public void shouldAddTask() {
taskManager.add(Collections.singleton(task));
verify(tasks).addTask(task);
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
assertEquals(1, taskManager.getTasks().size());
}
@Test
public void shouldAssignTask() {
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
assertEquals(task, taskManager.assignNextTask(taskExecutor));
assertNull(taskManager.assignNextTask(taskExecutor));
}
@Test
public void shouldUnassignTask() {
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
assertEquals(task, taskManager.assignNextTask(taskExecutor));
taskManager.unassignTask(task, taskExecutor);
assertEquals(task, taskManager.assignNextTask(taskExecutor));
}
@Test
public void shouldNotUnassignNotOwnedTask() {
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
assertEquals(task, taskManager.assignNextTask(taskExecutor));
final TaskExecutor anotherExecutor = mock(TaskExecutor.class);
assertThrows(IllegalArgumentException.class, () -> taskManager.unassignTask(task, anotherExecutor));
}
@Test
public void shouldNotRemoveUnlockedTask() {
taskManager.add(Collections.singleton(task));
assertThrows(IllegalArgumentException.class, () -> taskManager.remove(task.id()));
}
@Test
public void shouldNotRemoveAssignedTask() {
taskManager.add(Collections.singleton(task));
taskManager.assignNextTask(taskExecutor);
assertThrows(IllegalArgumentException.class, () -> taskManager.remove(task.id()));
}
@Test
public void shouldRemoveTask() {
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
when(tasks.task(task.id())).thenReturn(task);
when(tasks.contains(task.id())).thenReturn(true);
taskManager.lockTasks(Collections.singleton(task.id()));
taskManager.remove(task.id());
verify(tasks).removeTask(task);
reset(tasks);
when(tasks.activeTasks()).thenReturn(Collections.emptySet());
assertEquals(0, taskManager.getTasks().size());
}
@Test
public void shouldNotAssignLockedTask() {
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
when(tasks.task(task.id())).thenReturn(task);
when(tasks.contains(task.id())).thenReturn(true);
assertTrue(taskManager.lockTasks(Collections.singleton(task.id())).isDone());
assertNull(taskManager.assignNextTask(taskExecutor));
}
@Test
public void shouldNotAssignAnyLockedTask() {
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
when(tasks.task(task.id())).thenReturn(task);
when(tasks.contains(task.id())).thenReturn(true);
assertTrue(taskManager.lockAllTasks().isDone());
assertNull(taskManager.assignNextTask(taskExecutor));
}
@Test
public void shouldUnassignLockingTask() {
final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
taskManager.add(Collections.singleton(task));
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
when(tasks.task(task.id())).thenReturn(task);
when(tasks.contains(task.id())).thenReturn(true);
when(taskExecutor.unassign()).thenReturn(future);
assertEquals(task, taskManager.assignNextTask(taskExecutor));
final KafkaFuture<Void> lockFuture = taskManager.lockAllTasks();
assertFalse(lockFuture.isDone());
verify(taskExecutor).unassign();
future.complete(task);
assertTrue(lockFuture.isDone());
}
}