mirror of https://github.com/apache/kafka.git
KAFKA-18035: Backport TransactionsTest testBumpTransactionalEpochWithTV2Disabled failed on trunk (#20102)
CI / build (push) Waiting to run
Details
CI / build (push) Waiting to run
Details
Backports the flakyness fix in #18451 to 4.0 branch > Sometimes we didn't get into abortable state before aborting, so the epoch didn't get bumped. Now we force abortable state with an attempt to send before aborting so the epoch bump occurs as expected. > > Reviewers: Jeff Kim <jeff.kim@confluent.io> Reviewers: Chia-Ping Tsai <chia7712@gmail.com> Co-authored-by: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
parent
4ce6f5cb92
commit
45327fd597
|
@ -21,7 +21,7 @@ import kafka.utils.TestUtils.{consumeRecords, waitUntilTrue}
|
||||||
import kafka.utils.{TestInfoUtils, TestUtils}
|
import kafka.utils.{TestInfoUtils, TestUtils}
|
||||||
import org.apache.kafka.clients.consumer._
|
import org.apache.kafka.clients.consumer._
|
||||||
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
|
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
|
||||||
import org.apache.kafka.common.TopicPartition
|
import org.apache.kafka.common.{KafkaException, TopicPartition}
|
||||||
import org.apache.kafka.common.errors.{ConcurrentTransactionsException, InvalidProducerEpochException, ProducerFencedException, TimeoutException}
|
import org.apache.kafka.common.errors.{ConcurrentTransactionsException, InvalidProducerEpochException, ProducerFencedException, TimeoutException}
|
||||||
import org.apache.kafka.common.test.api.Flaky
|
import org.apache.kafka.common.test.api.Flaky
|
||||||
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
|
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
|
||||||
|
@ -738,6 +738,19 @@ class TransactionsTest extends IntegrationTestHarness {
|
||||||
restartDeadBrokers()
|
restartDeadBrokers()
|
||||||
|
|
||||||
org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture, classOf[TimeoutException])
|
org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture, classOf[TimeoutException])
|
||||||
|
// Ensure the producer transitions to abortable_error state.
|
||||||
|
TestUtils.waitUntilTrue(() => {
|
||||||
|
var failed = false
|
||||||
|
try {
|
||||||
|
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(testTopic, 0, "3", "3", willBeCommitted = false))
|
||||||
|
} catch {
|
||||||
|
case e: Exception =>
|
||||||
|
if (e.isInstanceOf[KafkaException])
|
||||||
|
failed = true
|
||||||
|
}
|
||||||
|
failed
|
||||||
|
}, "The send request never failed as expected.")
|
||||||
|
assertThrows(classOf[KafkaException], () => producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(testTopic, 0, "3", "3", willBeCommitted = false)))
|
||||||
producer.abortTransaction()
|
producer.abortTransaction()
|
||||||
|
|
||||||
producer.beginTransaction()
|
producer.beginTransaction()
|
||||||
|
@ -760,7 +773,7 @@ class TransactionsTest extends IntegrationTestHarness {
|
||||||
producerStateEntry =
|
producerStateEntry =
|
||||||
brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.get(producerId)
|
brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.get(producerId)
|
||||||
assertNotNull(producerStateEntry)
|
assertNotNull(producerStateEntry)
|
||||||
assertTrue(producerStateEntry.producerEpoch > initialProducerEpoch)
|
assertTrue(producerStateEntry.producerEpoch > initialProducerEpoch, "InitialProduceEpoch: " + initialProducerEpoch + " ProducerStateEntry: " + producerStateEntry)
|
||||||
} finally {
|
} finally {
|
||||||
producer.close(Duration.ZERO)
|
producer.close(Duration.ZERO)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue