KAFKA-18067: Add a flag to disable producer reset during active task creator shutting down (#19269)

JIRA: KAFKA-18067

Fix producer client double-closing issue in Kafka Streams. 
During StreamThread shutdown, TaskManager closes first, which closes the
producer client. Later, calling `unsubscribe` on the main consumer may
trigger the `onPartitionsLost` callback, attempting to reset
StreamsProducer when EOS is enabled. This causes an already closed
producer to be closed twice while the newly created producer is never
closed.

In detail:
This patch adds a flag to control the producer reset and has a new
method to change this flag, which is only invoked in
`ActiveTaskCreator#close`.
This would guarantee that the disable reset producer will only occur
when StreamThread shuts down.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias Sax <mjsax@apache.org>
This commit is contained in:
TengYao Chi 2025-04-04 05:13:22 +08:00 committed by A. Sophie Blee-Goldman
parent 617c96cea4
commit b0b4f42f4c
2 changed files with 33 additions and 1 deletions

View File

@ -64,6 +64,7 @@ class ActiveTaskCreator {
private final StreamsProducer streamsProducer;
private final boolean stateUpdaterEnabled;
private final boolean processingThreadsEnabled;
private boolean isClosed = false;
ActiveTaskCreator(final TopologyMetadata topologyMetadata,
final StreamsConfig applicationConfig,
@ -118,14 +119,27 @@ class ActiveTaskCreator {
return clientSupplier.getProducer(producerConfig);
}
/**
* When {@link org.apache.kafka.streams.processor.internals.StreamThread} is shutting down,
* subsequent calls to reInitializeProducer() will not recreate
* the producer instance, avoiding resource leak.
*/
public void reInitializeProducer() {
streamsProducer.resetProducer(producer());
if (!isClosed) {
streamsProducer.resetProducer(producer());
}
}
StreamsProducer streamsProducer() {
return streamsProducer;
}
// visible for test
boolean isClosed() {
return isClosed;
}
// TODO: convert to StreamTask when we remove TaskManager#StateMachineTask with mocks
public Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
@ -255,6 +269,7 @@ class ActiveTaskCreator {
void close() {
try {
isClosed = true;
streamsProducer.close();
} catch (final RuntimeException e) {
throw new StreamsException("Thread producer encounter error trying to close.", e);

View File

@ -190,9 +190,26 @@ public class ActiveTaskCreatorTest {
activeTaskCreator.close();
assertThat(activeTaskCreator.isClosed(), is(true));
assertThat(mockClientSupplier.producers.get(0).closed(), is(true));
}
@Test
public void shouldNotResetProducerAfterDisableRest() {
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
mockClientSupplier.setApplicationIdForProducer("appId");
createTasks();
assertThat(mockClientSupplier.producers.size(), is(1));
activeTaskCreator.close();
activeTaskCreator.reInitializeProducer();
// Verifies that disableReset() prevents reInitializeProducer() from creating a new producer instance
// Without disabling reset, the producers collection would contain more than one producer
assertThat("Producer should not be recreated after disabling reset",
mockClientSupplier.producers.size(),
is(1));
}
// error handling
@Test