From eb155a211370fe6f8d3871d4178a44de6ba52fe8 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Thu, 17 Jul 2025 17:28:08 +0200 Subject: [PATCH] =?UTF-8?q?MINOR:=20Revert=20"KAFKA-18913:=20Start=20state?= =?UTF-8?q?=20updater=20in=20task=20manager=20(#198=E2=80=A6=20(#20186)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 4d6cf3efef15b3c6b1511c28b882b5486ff1245c. It seemed to trigger a race condition in the state updater initialization. Reviewers: Bill Bejeck --- .../processor/internals/StreamThread.java | 11 +++--- .../processor/internals/TaskManager.java | 5 --- .../processor/internals/StreamThreadTest.java | 38 +++---------------- 3 files changed, 11 insertions(+), 43 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 86e12bf3f65..89cc988313b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -454,7 +454,7 @@ public class StreamThread extends Thread implements ProcessingThread { final DefaultTaskManager schedulingTaskManager = maybeCreateSchedulingTaskManager(processingThreadsEnabled, stateUpdaterEnabled, topologyMetadata, time, threadId, tasks); final StateUpdater stateUpdater = - maybeCreateStateUpdater( + maybeCreateAndStartStateUpdater( stateUpdaterEnabled, streamsMetrics, config, @@ -633,7 +633,7 @@ public class StreamThread extends Thread implements ProcessingThread { return null; } - private static StateUpdater maybeCreateStateUpdater(final boolean stateUpdaterEnabled, + private static StateUpdater maybeCreateAndStartStateUpdater(final boolean stateUpdaterEnabled, final StreamsMetricsImpl streamsMetrics, final StreamsConfig streamsConfig, final Consumer restoreConsumer, @@ -644,7 +644,7 @@ public class StreamThread extends Thread implements ProcessingThread { final int threadIdx) { if (stateUpdaterEnabled) { final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx; - return new DefaultStateUpdater( + final StateUpdater stateUpdater = new DefaultStateUpdater( name, streamsMetrics.metricsRegistry(), streamsConfig, @@ -653,6 +653,8 @@ public class StreamThread extends Thread implements ProcessingThread { topologyMetadata, time ); + stateUpdater.start(); + return stateUpdater; } else { return null; } @@ -881,9 +883,6 @@ public class StreamThread extends Thread implements ProcessingThread { } boolean cleanRun = false; try { - if (stateUpdaterEnabled) { - taskManager.init(); - } cleanRun = runLoop(); } catch (final Throwable e) { failedStreamThreadSensor.record(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 3ec3bb78557..9207ec74a7c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -149,11 +149,6 @@ public class TaskManager { ); } - void init() { - if (stateUpdater != null) { - this.stateUpdater.start(); - } - } void setMainConsumer(final Consumer mainConsumer) { this.mainConsumer = mainConsumer; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 9eed49110bb..aa68672b3c0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -109,7 +109,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.Mockito; @@ -917,7 +916,6 @@ public class StreamThreadTest { thread = createStreamThread(CLIENT_ID, config); thread.setState(StreamThread.State.STARTING); - thread.taskManager().init(); thread.setState(StreamThread.State.PARTITIONS_REVOKED); final TaskId task1 = new TaskId(0, t1p1.partition()); @@ -1291,7 +1289,6 @@ public class StreamThreadTest { thread = createStreamThread(CLIENT_ID, new StreamsConfig(props)); thread.setState(StreamThread.State.STARTING); - thread.taskManager().init(); thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList()); final Map> activeTasks = new HashMap<>(); @@ -1549,7 +1546,6 @@ public class StreamThreadTest { consumer.updatePartitions(topic1, Collections.singletonList(new PartitionInfo(topic1, 1, null, null, null))); thread.setState(StreamThread.State.STARTING); - thread.taskManager().init(); thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> activeTasks = new HashMap<>(); @@ -1615,7 +1611,6 @@ public class StreamThreadTest { internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); thread.setState(StreamThread.State.STARTING); - thread.taskManager().init(); thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> activeTasks = new HashMap<>(); @@ -1698,7 +1693,6 @@ public class StreamThreadTest { internalTopologyBuilder.buildTopology(); thread.setState(StreamThread.State.STARTING); - thread.taskManager().init(); thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> activeTasks = new HashMap<>(); @@ -1793,7 +1787,6 @@ public class StreamThreadTest { consumer.updatePartitions(topic1, Collections.singletonList(new PartitionInfo(topic1, 1, null, null, null))); thread.setState(StreamThread.State.STARTING); - thread.taskManager().init(); thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> activeTasks = new HashMap<>(); @@ -1858,7 +1851,6 @@ public class StreamThreadTest { internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); thread.setState(StreamThread.State.STARTING); - thread.taskManager().init(); thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> activeTasks = new HashMap<>(); @@ -1940,7 +1932,6 @@ public class StreamThreadTest { ); thread.setState(StreamThread.State.STARTING); - thread.taskManager().init(); thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> activeTasks = new HashMap<>(); @@ -2001,7 +1992,6 @@ public class StreamThreadTest { restoreConsumer.updateBeginningOffsets(offsets); thread.setState(StreamThread.State.STARTING); - thread.taskManager().init(); thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final Map> standbyTasks = new HashMap<>(); @@ -2265,7 +2255,6 @@ public class StreamThreadTest { thread = createStreamThread(CLIENT_ID, config); thread.setState(StreamThread.State.STARTING); - thread.taskManager().init(); thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final List assignedPartitions = new ArrayList<>(); @@ -2345,7 +2334,6 @@ public class StreamThreadTest { thread = createStreamThread(CLIENT_ID, stateUpdaterEnabled, processingThreadsEnabled); thread.setState(StreamThread.State.STARTING); - thread.taskManager().init(); thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); final List assignedPartitions = new ArrayList<>(); @@ -2543,7 +2531,6 @@ public class StreamThreadTest { thread = createStreamThread(CLIENT_ID, new StreamsConfig(properties)); thread.setState(StreamThread.State.STARTING); - thread.taskManager().init(); thread.setState(StreamThread.State.PARTITIONS_REVOKED); final TaskId task1 = new TaskId(0, t1p1.partition()); @@ -3030,7 +3017,6 @@ public class StreamThreadTest { thread = createStreamThread(CLIENT_ID, config); thread.setState(StreamThread.State.STARTING); - thread.taskManager().init(); thread.setState(StreamThread.State.PARTITIONS_REVOKED); final TaskId task1 = new TaskId(0, t1p1.partition()); @@ -3404,7 +3390,6 @@ public class StreamThreadTest { thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); - thread.taskManager().init(); final Map> clientInstanceIdFutures = thread.clientInstanceIds(Duration.ZERO); @@ -3429,7 +3414,6 @@ public class StreamThreadTest { public void shouldReturnErrorIfMainConsumerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); - thread.taskManager().init(); final Map> consumerFutures = thread.clientInstanceIds(Duration.ZERO); @@ -3446,7 +3430,6 @@ public class StreamThreadTest { public void shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); - thread.taskManager().init(); final Map> consumerFutures = thread.clientInstanceIds(Duration.ZERO); @@ -3463,7 +3446,6 @@ public class StreamThreadTest { public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); - thread.taskManager().init(); final Map> producerFutures = thread.clientInstanceIds(Duration.ZERO); @@ -3481,7 +3463,6 @@ public class StreamThreadTest { clientSupplier.consumer.disableTelemetry(); thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); - thread.taskManager().init(); final Map> consumerFutures = thread.clientInstanceIds(Duration.ZERO); @@ -3499,7 +3480,6 @@ public class StreamThreadTest { thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); - thread.taskManager().init(); final Map> consumerFutures = thread.clientInstanceIds(Duration.ZERO); @@ -3519,7 +3499,6 @@ public class StreamThreadTest { thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); - thread.taskManager().init(); final Map> producerFutures = thread.clientInstanceIds(Duration.ZERO); @@ -3537,7 +3516,6 @@ public class StreamThreadTest { clientSupplier.consumer.injectTimeoutException(-1); thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); - thread.taskManager().init(); final Map> consumerFutures = thread.clientInstanceIds(Duration.ZERO); @@ -3562,7 +3540,6 @@ public class StreamThreadTest { clientSupplier.restoreConsumer.injectTimeoutException(-1); thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); - thread.taskManager().init(); final Map> consumerFutures = thread.clientInstanceIds(Duration.ZERO); @@ -3590,7 +3567,6 @@ public class StreamThreadTest { thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); thread.setState(State.STARTING); - thread.taskManager().init(); final Map> producerFutures = thread.clientInstanceIds(Duration.ZERO); @@ -3607,10 +3583,9 @@ public class StreamThreadTest { ); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testNamedTopologyWithStreamsProtocol(final boolean stateUpdaterEnabled) { - final Properties props = configProps(false, stateUpdaterEnabled, false); + @Test + public void testNamedTopologyWithStreamsProtocol() { + final Properties props = configProps(false, false, false); props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.toString()); final StreamsConfig config = new StreamsConfig(props); final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder( @@ -3667,10 +3642,9 @@ public class StreamThreadTest { assertTrue(thread.streamsRebalanceData().isEmpty()); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testStreamsRebalanceDataWithExtraCopartition(final boolean stateUpdaterEnabled) { - final Properties props = configProps(false, stateUpdaterEnabled, false); + @Test + public void testStreamsRebalanceDataWithExtraCopartition() { + final Properties props = configProps(false, false, false); props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.toString()); internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);