HOTFIX: Revert "KAFKA-18067: Kafka Streams can leak Producer client under EOS (#17931)" (#19078)

This reverts commit e8837465a5.

The commit that is reverted prevents Kafka Streams from re-initializing
its transactional producer. If an exception that fences the
transactional producer occurs, the producer is not re-initialized during
the handling of the exception. That causes an infinite loop of
ProducerFencedExceptions with corresponding rebalances.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, David Jacot
<djacot@confluent.io>
This commit is contained in:
Bruno Cadonna 2025-03-03 11:58:56 +01:00 committed by Bruno Cadonna
parent d8bfd80486
commit d91fbc7fa7
3 changed files with 1 additions and 22 deletions

View File

@ -119,7 +119,6 @@ class ActiveTaskCreator {
} }
public void reInitializeProducer() { public void reInitializeProducer() {
if (!streamsProducer.isClosed())
streamsProducer.resetProducer(producer()); streamsProducer.resetProducer(producer());
} }

View File

@ -70,7 +70,6 @@ public class StreamsProducer {
private Producer<byte[], byte[]> producer; private Producer<byte[], byte[]> producer;
private boolean transactionInFlight = false; private boolean transactionInFlight = false;
private boolean transactionInitialized = false; private boolean transactionInitialized = false;
private boolean closed = false;
private double oldProducerTotalBlockedTime = 0; private double oldProducerTotalBlockedTime = 0;
// we have a single `StreamsProducer` per thread, and thus a single `sendException` instance, // we have a single `StreamsProducer` per thread, and thus a single `sendException` instance,
// which we share across all tasks, ie, all `RecordCollectorImpl` // which we share across all tasks, ie, all `RecordCollectorImpl`
@ -99,10 +98,6 @@ public class StreamsProducer {
return transactionInFlight; return transactionInFlight;
} }
boolean isClosed() {
return closed;
}
/** /**
* @throws IllegalStateException if EOS is disabled * @throws IllegalStateException if EOS is disabled
*/ */
@ -325,7 +320,6 @@ public class StreamsProducer {
void close() { void close() {
producer.close(); producer.close();
closed = true;
transactionInFlight = false; transactionInFlight = false;
transactionInitialized = false; transactionInitialized = false;
} }

View File

@ -190,23 +190,9 @@ public class ActiveTaskCreatorTest {
activeTaskCreator.close(); activeTaskCreator.close();
assertThat(activeTaskCreator.streamsProducer().isClosed(), is(true));
assertThat(mockClientSupplier.producers.get(0).closed(), is(true)); assertThat(mockClientSupplier.producers.get(0).closed(), is(true));
} }
@Test
public void shouldNotReInitializeProducerOnClose() {
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
mockClientSupplier.setApplicationIdForProducer("appId");
createTasks();
activeTaskCreator.streamsProducer().close();
activeTaskCreator.reInitializeProducer();
// If streamsProducer is not closed, clientSupplier will recreate a producer,
// resulting in more than one producer being created.
assertThat(mockClientSupplier.producers.size(), is(1));
}
// error handling // error handling
@Test @Test