From 45327fd597b68ed4651361ecad913f7c719fffeb Mon Sep 17 00:00:00 2001 From: Gaurav Narula Date: Fri, 4 Jul 2025 22:00:10 +0100 Subject: [PATCH] KAFKA-18035: Backport TransactionsTest testBumpTransactionalEpochWithTV2Disabled failed on trunk (#20102) Backports the flakyness fix in #18451 to 4.0 branch > Sometimes we didn't get into abortable state before aborting, so the epoch didn't get bumped. Now we force abortable state with an attempt to send before aborting so the epoch bump occurs as expected. > > Reviewers: Jeff Kim Reviewers: Chia-Ping Tsai Co-authored-by: Justine Olshan --- .../kafka/api/TransactionsTest.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 74737668127..c531f62595a 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -21,7 +21,7 @@ import kafka.utils.TestUtils.{consumeRecords, waitUntilTrue} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.{ConcurrentTransactionsException, InvalidProducerEpochException, ProducerFencedException, TimeoutException} import org.apache.kafka.common.test.api.Flaky import org.apache.kafka.coordinator.group.GroupCoordinatorConfig @@ -738,6 +738,19 @@ class TransactionsTest extends IntegrationTestHarness { restartDeadBrokers() org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture, classOf[TimeoutException]) + // Ensure the producer transitions to abortable_error state. + TestUtils.waitUntilTrue(() => { + var failed = false + try { + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(testTopic, 0, "3", "3", willBeCommitted = false)) + } catch { + case e: Exception => + if (e.isInstanceOf[KafkaException]) + failed = true + } + failed + }, "The send request never failed as expected.") + assertThrows(classOf[KafkaException], () => producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(testTopic, 0, "3", "3", willBeCommitted = false))) producer.abortTransaction() producer.beginTransaction() @@ -760,7 +773,7 @@ class TransactionsTest extends IntegrationTestHarness { producerStateEntry = brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.get(producerId) assertNotNull(producerStateEntry) - assertTrue(producerStateEntry.producerEpoch > initialProducerEpoch) + assertTrue(producerStateEntry.producerEpoch > initialProducerEpoch, "InitialProduceEpoch: " + initialProducerEpoch + " ProducerStateEntry: " + producerStateEntry) } finally { producer.close(Duration.ZERO) }