KAFKA-17553 Fix shutdown race condition in StreamThreadTest (#17191)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Arthur 2024-09-19 03:17:25 -04:00 committed by GitHub
parent 8f5cf9968f
commit 31d395163e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 15 additions and 9 deletions

View File

@ -116,7 +116,7 @@ public class DefaultTaskExecutor implements TaskExecutor {
if (currentTask == null) { if (currentTask == null) {
try { try {
taskManager.awaitProcessableTasks(); taskManager.awaitProcessableTasks(shutdownRequested::get);
} catch (final InterruptedException ignored) { } catch (final InterruptedException ignored) {
// Can be ignored, the cause of the interrupted will be handled in the event loop // Can be ignored, the cause of the interrupted will be handled in the event loop
} }

View File

@ -125,7 +125,7 @@ public class DefaultTaskManager implements TaskManager {
} }
@Override @Override
public void awaitProcessableTasks() throws InterruptedException { public void awaitProcessableTasks(final Supplier<Boolean> isShuttingDown) throws InterruptedException {
final boolean interrupted = returnWithTasksLocked(() -> { final boolean interrupted = returnWithTasksLocked(() -> {
for (final Task task : tasks.activeTasks()) { for (final Task task : tasks.activeTasks()) {
if (!assignedTasks.containsKey(task.id()) && if (!assignedTasks.containsKey(task.id()) &&
@ -138,8 +138,15 @@ public class DefaultTaskManager implements TaskManager {
} }
} }
try { try {
log.debug("Await blocking"); // We re-check the shutdownRequest atomic boolean to avoid a race condition. If this thread was
tasksCondition.await(); // previously interrupted while awaiting tasksCondition, it is possible to miss the signalAll that
// is called during shutdown. If this happens, we end up blocking in the await forever.
if (!isShuttingDown.get()) {
log.debug("Await blocking");
tasksCondition.await();
} else {
log.debug("Not awaiting since shutdown was requested");
}
} catch (final InterruptedException ignored) { } catch (final InterruptedException ignored) {
// we interrupt the thread for shut down and pause. // we interrupt the thread for shut down and pause.
// we can ignore this exception. // we can ignore this exception.

View File

@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.internals.StreamTask;
import java.time.Duration; import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.Supplier;
public interface TaskManager { public interface TaskManager {
@ -136,7 +137,7 @@ public interface TaskManager {
/** /**
* Blocks until unassigned processable tasks may be available. * Blocks until unassigned processable tasks may be available.
*/ */
void awaitProcessableTasks() throws InterruptedException; void awaitProcessableTasks(Supplier<Boolean> isShuttingDown) throws InterruptedException;
/** /**
* Starts all threads associated with this task manager. * Starts all threads associated with this task manager.

View File

@ -95,7 +95,6 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
@ -173,7 +172,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@Timeout(300)
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS) @MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class StreamThreadTest { public class StreamThreadTest {

View File

@ -118,7 +118,7 @@ public class DefaultTaskExecutorTest {
taskExecutor.start(); taskExecutor.start();
verify(taskManager, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).awaitProcessableTasks(); verify(taskManager, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).awaitProcessableTasks(any());
} }
@Test @Test

View File

@ -115,7 +115,7 @@ public class DefaultTaskManagerTest {
public void run() { public void run() {
while (!shutdownRequested.get()) { while (!shutdownRequested.get()) {
try { try {
taskManager.awaitProcessableTasks(); taskManager.awaitProcessableTasks(shutdownRequested::get);
} catch (final InterruptedException ignored) { } catch (final InterruptedException ignored) {
} }
awaitDone.countDown(); awaitDone.countDown();