HOTFIX: Fix StreamThreadTest (#19562)

Commit 732ed06 changed the logic of handling shutdowns, but in parallel
commit 3fae785 had introduced a new unit test for checking how to shut
down, which was broken by the later commit.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-04-25 15:03:39 +02:00 committed by GitHub
parent 965743c35b
commit d087ade527
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 24 additions and 14 deletions

View File

@ -45,6 +45,7 @@ import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
@ -52,6 +53,7 @@ import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogCaptureAppender;
@ -82,7 +84,6 @@ import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StreamThread.State;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager;
@ -178,6 +179,7 @@ import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -640,7 +642,7 @@ public class StreamThreadTest {
thread.setState(State.PARTITIONS_REVOKED);
thread.runOnceWithoutProcessingThreads();
Mockito.verify(taskManager, Mockito.never()).process(Mockito.anyInt(), Mockito.any());
Mockito.verify(taskManager, never()).process(Mockito.anyInt(), Mockito.any());
}
@ParameterizedTest
@ -3800,7 +3802,7 @@ public class StreamThreadTest {
Map.of(),
Map.of()
);
final AtomicInteger assignmentErrorCode = new AtomicInteger(0);
final Runnable shutdownErrorHook = mock(Runnable.class);
final Properties props = configProps(false, false, false);
final StreamsConfig config = new StreamsConfig(props);
@ -3819,10 +3821,10 @@ public class StreamThreadTest {
PROCESS_ID,
CLIENT_ID,
new LogContext(""),
assignmentErrorCode,
null,
new AtomicLong(Long.MAX_VALUE),
new LinkedList<>(),
null,
shutdownErrorHook,
HANDLER,
null,
Optional.of(streamsRebalanceData),
@ -3831,11 +3833,15 @@ public class StreamThreadTest {
thread.setState(State.STARTING);
thread.runOnceWithoutProcessingThreads();
assertEquals(0, assignmentErrorCode.get());
verify(shutdownErrorHook, never()).run();
streamsRebalanceData.requestShutdown();
streamsRebalanceData.setStatuses(List.of(
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail("Shutdown requested")
));
thread.runOnceWithoutProcessingThreads();
assertEquals(AssignorError.SHUTDOWN_REQUESTED.code(), assignmentErrorCode.get());
verify(shutdownErrorHook).run();
}
@Test
@ -3850,9 +3856,9 @@ public class StreamThreadTest {
Map.of(),
Map.of()
);
final AtomicInteger assignmentErrorCode = new AtomicInteger(0);
final Properties props = configProps(false, false, false);
final Runnable shutdownErrorHook = mock(Runnable.class);
final StreamsConfig config = new StreamsConfig(props);
thread = new StreamThread(
new MockTime(1),
@ -3869,10 +3875,10 @@ public class StreamThreadTest {
PROCESS_ID,
CLIENT_ID,
new LogContext(""),
assignmentErrorCode,
null,
new AtomicLong(Long.MAX_VALUE),
new LinkedList<>(),
null,
shutdownErrorHook,
HANDLER,
null,
Optional.of(streamsRebalanceData),
@ -3881,11 +3887,15 @@ public class StreamThreadTest {
thread.setState(State.STARTING);
thread.runOnceWithProcessingThreads();
assertEquals(0, assignmentErrorCode.get());
verify(shutdownErrorHook, never()).run();
streamsRebalanceData.requestShutdown();
streamsRebalanceData.setStatuses(List.of(
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail("Shutdown requested")
));
thread.runOnceWithProcessingThreads();
assertEquals(AssignorError.SHUTDOWN_REQUESTED.code(), assignmentErrorCode.get());
verify(shutdownErrorHook).run();
}
@Test