MINOR: Revert "KAFKA-18913: Start state updater in task manager (#198… (#20186)
CI / build (push) Has been cancelled Details

This reverts commit 4d6cf3efef. It seemed
to trigger a race condition in the state updater initialization.

Reviewers: Bill Bejeck <bbejeck@apache.org>
This commit is contained in:
Lucas Brutschy 2025-07-17 17:28:08 +02:00 committed by GitHub
parent 05f012c7f1
commit eb155a2113
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 11 additions and 43 deletions

View File

@ -454,7 +454,7 @@ public class StreamThread extends Thread implements ProcessingThread {
final DefaultTaskManager schedulingTaskManager =
maybeCreateSchedulingTaskManager(processingThreadsEnabled, stateUpdaterEnabled, topologyMetadata, time, threadId, tasks);
final StateUpdater stateUpdater =
maybeCreateStateUpdater(
maybeCreateAndStartStateUpdater(
stateUpdaterEnabled,
streamsMetrics,
config,
@ -633,7 +633,7 @@ public class StreamThread extends Thread implements ProcessingThread {
return null;
}
private static StateUpdater maybeCreateStateUpdater(final boolean stateUpdaterEnabled,
private static StateUpdater maybeCreateAndStartStateUpdater(final boolean stateUpdaterEnabled,
final StreamsMetricsImpl streamsMetrics,
final StreamsConfig streamsConfig,
final Consumer<byte[], byte[]> restoreConsumer,
@ -644,7 +644,7 @@ public class StreamThread extends Thread implements ProcessingThread {
final int threadIdx) {
if (stateUpdaterEnabled) {
final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx;
return new DefaultStateUpdater(
final StateUpdater stateUpdater = new DefaultStateUpdater(
name,
streamsMetrics.metricsRegistry(),
streamsConfig,
@ -653,6 +653,8 @@ public class StreamThread extends Thread implements ProcessingThread {
topologyMetadata,
time
);
stateUpdater.start();
return stateUpdater;
} else {
return null;
}
@ -881,9 +883,6 @@ public class StreamThread extends Thread implements ProcessingThread {
}
boolean cleanRun = false;
try {
if (stateUpdaterEnabled) {
taskManager.init();
}
cleanRun = runLoop();
} catch (final Throwable e) {
failedStreamThreadSensor.record();

View File

@ -149,11 +149,6 @@ public class TaskManager {
);
}
void init() {
if (stateUpdater != null) {
this.stateUpdater.start();
}
}
void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) {
this.mainConsumer = mainConsumer;
}

View File

@ -109,7 +109,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
@ -917,7 +916,6 @@ public class StreamThreadTest {
thread = createStreamThread(CLIENT_ID, config);
thread.setState(StreamThread.State.STARTING);
thread.taskManager().init();
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
final TaskId task1 = new TaskId(0, t1p1.partition());
@ -1291,7 +1289,6 @@ public class StreamThreadTest {
thread = createStreamThread(CLIENT_ID, new StreamsConfig(props));
thread.setState(StreamThread.State.STARTING);
thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@ -1549,7 +1546,6 @@ public class StreamThreadTest {
consumer.updatePartitions(topic1, Collections.singletonList(new PartitionInfo(topic1, 1, null, null, null)));
thread.setState(StreamThread.State.STARTING);
thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@ -1615,7 +1611,6 @@ public class StreamThreadTest {
internalTopologyBuilder.addSink("out", "output", null, null, null, "name");
thread.setState(StreamThread.State.STARTING);
thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@ -1698,7 +1693,6 @@ public class StreamThreadTest {
internalTopologyBuilder.buildTopology();
thread.setState(StreamThread.State.STARTING);
thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@ -1793,7 +1787,6 @@ public class StreamThreadTest {
consumer.updatePartitions(topic1, Collections.singletonList(new PartitionInfo(topic1, 1, null, null, null)));
thread.setState(StreamThread.State.STARTING);
thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@ -1858,7 +1851,6 @@ public class StreamThreadTest {
internalTopologyBuilder.addSink("out", "output", null, null, null, "name");
thread.setState(StreamThread.State.STARTING);
thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@ -1940,7 +1932,6 @@ public class StreamThreadTest {
);
thread.setState(StreamThread.State.STARTING);
thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@ -2001,7 +1992,6 @@ public class StreamThreadTest {
restoreConsumer.updateBeginningOffsets(offsets);
thread.setState(StreamThread.State.STARTING);
thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
@ -2265,7 +2255,6 @@ public class StreamThreadTest {
thread = createStreamThread(CLIENT_ID, config);
thread.setState(StreamThread.State.STARTING);
thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final List<TopicPartition> assignedPartitions = new ArrayList<>();
@ -2345,7 +2334,6 @@ public class StreamThreadTest {
thread = createStreamThread(CLIENT_ID, stateUpdaterEnabled, processingThreadsEnabled);
thread.setState(StreamThread.State.STARTING);
thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final List<TopicPartition> assignedPartitions = new ArrayList<>();
@ -2543,7 +2531,6 @@ public class StreamThreadTest {
thread = createStreamThread(CLIENT_ID, new StreamsConfig(properties));
thread.setState(StreamThread.State.STARTING);
thread.taskManager().init();
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
final TaskId task1 = new TaskId(0, t1p1.partition());
@ -3030,7 +3017,6 @@ public class StreamThreadTest {
thread = createStreamThread(CLIENT_ID, config);
thread.setState(StreamThread.State.STARTING);
thread.taskManager().init();
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
final TaskId task1 = new TaskId(0, t1p1.partition());
@ -3404,7 +3390,6 @@ public class StreamThreadTest {
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
thread.setState(State.STARTING);
thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> clientInstanceIdFutures = thread.clientInstanceIds(Duration.ZERO);
@ -3429,7 +3414,6 @@ public class StreamThreadTest {
public void shouldReturnErrorIfMainConsumerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
thread.setState(State.STARTING);
thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> consumerFutures = thread.clientInstanceIds(Duration.ZERO);
@ -3446,7 +3430,6 @@ public class StreamThreadTest {
public void shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
thread.setState(State.STARTING);
thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> consumerFutures = thread.clientInstanceIds(Duration.ZERO);
@ -3463,7 +3446,6 @@ public class StreamThreadTest {
public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
thread.setState(State.STARTING);
thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> producerFutures = thread.clientInstanceIds(Duration.ZERO);
@ -3481,7 +3463,6 @@ public class StreamThreadTest {
clientSupplier.consumer.disableTelemetry();
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
thread.setState(State.STARTING);
thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> consumerFutures = thread.clientInstanceIds(Duration.ZERO);
@ -3499,7 +3480,6 @@ public class StreamThreadTest {
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
thread.setState(State.STARTING);
thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> consumerFutures = thread.clientInstanceIds(Duration.ZERO);
@ -3519,7 +3499,6 @@ public class StreamThreadTest {
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
thread.setState(State.STARTING);
thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> producerFutures = thread.clientInstanceIds(Duration.ZERO);
@ -3537,7 +3516,6 @@ public class StreamThreadTest {
clientSupplier.consumer.injectTimeoutException(-1);
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
thread.setState(State.STARTING);
thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> consumerFutures = thread.clientInstanceIds(Duration.ZERO);
@ -3562,7 +3540,6 @@ public class StreamThreadTest {
clientSupplier.restoreConsumer.injectTimeoutException(-1);
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
thread.setState(State.STARTING);
thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> consumerFutures = thread.clientInstanceIds(Duration.ZERO);
@ -3590,7 +3567,6 @@ public class StreamThreadTest {
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
thread.setState(State.STARTING);
thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> producerFutures = thread.clientInstanceIds(Duration.ZERO);
@ -3607,10 +3583,9 @@ public class StreamThreadTest {
);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testNamedTopologyWithStreamsProtocol(final boolean stateUpdaterEnabled) {
final Properties props = configProps(false, stateUpdaterEnabled, false);
@Test
public void testNamedTopologyWithStreamsProtocol() {
final Properties props = configProps(false, false, false);
props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.toString());
final StreamsConfig config = new StreamsConfig(props);
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
@ -3667,10 +3642,9 @@ public class StreamThreadTest {
assertTrue(thread.streamsRebalanceData().isEmpty());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testStreamsRebalanceDataWithExtraCopartition(final boolean stateUpdaterEnabled) {
final Properties props = configProps(false, stateUpdaterEnabled, false);
@Test
public void testStreamsRebalanceDataWithExtraCopartition() {
final Properties props = configProps(false, false, false);
props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.toString());
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);