KAFKA-18067: Kafka Streams can leak Producer client under EOS (#17931)

To avoid leaking producers, we should add a 'closedflag toStreamProducer` indicating whether we should reset prouder.

Reviewers: Guozhang Wang <guozhang.wang.us@gmail.com>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
TengYao Chi 2024-12-10 08:12:05 +08:00 committed by GitHub
parent 05fd36a3b7
commit e8837465a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 22 additions and 1 deletions

View File

@ -119,6 +119,7 @@ class ActiveTaskCreator {
}
public void reInitializeProducer() {
if (!streamsProducer.isClosed())
streamsProducer.resetProducer(producer());
}

View File

@ -70,6 +70,7 @@ public class StreamsProducer {
private Producer<byte[], byte[]> producer;
private boolean transactionInFlight = false;
private boolean transactionInitialized = false;
private boolean closed = false;
private double oldProducerTotalBlockedTime = 0;
// we have a single `StreamsProducer` per thread, and thus a single `sendException` instance,
// which we share across all tasks, ie, all `RecordCollectorImpl`
@ -98,6 +99,10 @@ public class StreamsProducer {
return transactionInFlight;
}
boolean isClosed() {
return closed;
}
/**
* @throws IllegalStateException if EOS is disabled
*/
@ -320,6 +325,7 @@ public class StreamsProducer {
void close() {
producer.close();
closed = true;
transactionInFlight = false;
transactionInitialized = false;
}

View File

@ -190,9 +190,23 @@ public class ActiveTaskCreatorTest {
activeTaskCreator.close();
assertThat(activeTaskCreator.streamsProducer().isClosed(), is(true));
assertThat(mockClientSupplier.producers.get(0).closed(), is(true));
}
@Test
public void shouldNotReInitializeProducerOnClose() {
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
mockClientSupplier.setApplicationIdForProducer("appId");
createTasks();
activeTaskCreator.streamsProducer().close();
activeTaskCreator.reInitializeProducer();
// If streamsProducer is not closed, clientSupplier will recreate a producer,
// resulting in more than one producer being created.
assertThat(mockClientSupplier.producers.size(), is(1));
}
// error handling
@Test