mirror of https://github.com/apache/kafka.git
KAFKA-15764: Missing Tests for Transactions (#14702)
I ran this test 40 times without KAFKA-15653 with and without compression enabled. With compression it failed 39/40 times and without it passed 40/40 times. With the KAFKA-15653 and compression it passed 40/40 times locally Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
parent
3bd8ec16f6
commit
ed7ad6d9d3
|
|
@ -26,6 +26,7 @@ import kafka.server.KafkaConfig
|
|||
import kafka.utils.{TestInfoUtils, TestUtils}
|
||||
import kafka.utils.TestUtils.{consumeRecords, waitUntilTrue}
|
||||
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerGroupMetadata, OffsetAndMetadata}
|
||||
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
|
||||
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
|
||||
import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException}
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
|
|
@ -820,6 +821,33 @@ class TransactionsTest extends IntegrationTestHarness {
|
|||
assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch)
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||
@ValueSource(strings = Array("zk", "kraft"))
|
||||
def testTransactionsWithCompression(quorum: String): Unit = {
|
||||
val numRecords = 50
|
||||
val numProducersWithCompression = 5
|
||||
val numTransactions = 40
|
||||
val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
|
||||
|
||||
for (i <- 0 until numProducersWithCompression) {
|
||||
transactionalCompressionProducers += createTransactionalProducer("transactional-compression-producer-" + i.toString, compressionType = "snappy")
|
||||
}
|
||||
createTopic("topic", 100, brokerCount, topicConfig())
|
||||
transactionalCompressionProducers.foreach(_.initTransactions())
|
||||
|
||||
for (i <- 0 until numTransactions) {
|
||||
transactionalCompressionProducers.foreach(_.beginTransaction())
|
||||
|
||||
for (i <- 0 until numRecords) {
|
||||
transactionalCompressionProducers.foreach(producer =>
|
||||
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus("topic", null, i.toString, producer.toString, willBeCommitted = true),
|
||||
new ErrorLoggingCallback("topic", i.toString.getBytes(StandardCharsets.UTF_8), producer.toString.getBytes(StandardCharsets.UTF_8), true))
|
||||
)
|
||||
}
|
||||
transactionalCompressionProducers.foreach(_.commitTransaction())
|
||||
}
|
||||
}
|
||||
|
||||
private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String,
|
||||
start: Int, end: Int, willBeCommitted: Boolean): Unit = {
|
||||
for (i <- start until end) {
|
||||
|
|
@ -852,14 +880,16 @@ class TransactionsTest extends IntegrationTestHarness {
|
|||
transactionTimeoutMs: Long = 60000,
|
||||
maxBlockMs: Long = 60000,
|
||||
deliveryTimeoutMs: Int = 120000,
|
||||
requestTimeoutMs: Int = 30000): KafkaProducer[Array[Byte], Array[Byte]] = {
|
||||
requestTimeoutMs: Int = 30000,
|
||||
compressionType: String = "none"): KafkaProducer[Array[Byte], Array[Byte]] = {
|
||||
val producer = TestUtils.createTransactionalProducer(
|
||||
transactionalId,
|
||||
brokers,
|
||||
transactionTimeoutMs = transactionTimeoutMs,
|
||||
maxBlockMs = maxBlockMs,
|
||||
deliveryTimeoutMs = deliveryTimeoutMs,
|
||||
requestTimeoutMs = requestTimeoutMs
|
||||
requestTimeoutMs = requestTimeoutMs,
|
||||
compressionType = compressionType
|
||||
)
|
||||
transactionalProducers += producer
|
||||
producer
|
||||
|
|
|
|||
|
|
@ -1896,7 +1896,8 @@ object TestUtils extends Logging {
|
|||
maxBlockMs: Long = 60000,
|
||||
deliveryTimeoutMs: Int = 120000,
|
||||
requestTimeoutMs: Int = 30000,
|
||||
maxInFlight: Int = 5): KafkaProducer[Array[Byte], Array[Byte]] = {
|
||||
maxInFlight: Int = 5,
|
||||
compressionType: String = "none"): KafkaProducer[Array[Byte], Array[Byte]] = {
|
||||
val props = new Properties()
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, plaintextBootstrapServers(brokers))
|
||||
props.put(ProducerConfig.ACKS_CONFIG, "all")
|
||||
|
|
@ -1908,6 +1909,7 @@ object TestUtils extends Logging {
|
|||
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs.toString)
|
||||
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs.toString)
|
||||
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlight.toString)
|
||||
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
|
||||
new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue