KAFKA-19042 Move TransactionsExpirationTest to client-integration-tests module (#19288)

Use Java to rewrite `TransactionsExpirationTest` by new test infra and
move it to client-integration-tests module.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2025-04-05 20:01:31 +08:00 committed by GitHub
parent ebb62812d9
commit 3d96b20630
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 327 additions and 252 deletions

View File

@ -0,0 +1,310 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ProducerState;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterFeature;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ClusterTestDefaults(
types = {Type.CO_KRAFT},
brokers = 3,
serverProperties = {
@ClusterConfigProperty(key = ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
// Set a smaller value for the number of partitions for the __consumer_offsets topic
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long.
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "3"),
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "2"),
@ClusterConfigProperty(key = ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = "log.unclean.leader.election.enable", value = "false"),
@ClusterConfigProperty(key = ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"),
@ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"),
@ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "200"),
@ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, value = "10000"),
@ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, value = "500"),
@ClusterConfigProperty(key = TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, value = "5000"),
@ClusterConfigProperty(key = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, value = "500"),
}
)
public class TransactionsExpirationTest {
private static final String TOPIC1 = "topic1";
private static final String TOPIC2 = "topic2";
private static final String TRANSACTION_ID = "transactionalProducer";
private static final String HEADER_KEY = "transactionStatus";
private static final byte[] ABORTED_VALUE = "aborted".getBytes();
private static final byte[] COMMITTED_VALUE = "committed".getBytes();
private static final TopicPartition TOPIC1_PARTITION0 = new TopicPartition(TOPIC1, 0);
@ClusterTest(features = {@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1)})
public void testFatalErrorAfterInvalidProducerIdMappingWithTV1(ClusterInstance clusterInstance) throws InterruptedException {
testFatalErrorAfterInvalidProducerIdMapping(clusterInstance);
}
@ClusterTest(features = {@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2)})
public void testFatalErrorAfterInvalidProducerIdMappingWithTV2(ClusterInstance clusterInstance) throws InterruptedException {
testFatalErrorAfterInvalidProducerIdMapping(clusterInstance);
}
@ClusterTest(features = {@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1)})
public void testTransactionAfterProducerIdExpiresWithTV1(ClusterInstance clusterInstance) throws InterruptedException {
testTransactionAfterProducerIdExpires(clusterInstance, false);
}
@ClusterTest(features = {@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2)})
public void testTransactionAfterProducerIdExpiresWithTV2(ClusterInstance clusterInstance) throws InterruptedException {
testTransactionAfterProducerIdExpires(clusterInstance, true);
}
private void testFatalErrorAfterInvalidProducerIdMapping(ClusterInstance clusterInstance) throws InterruptedException {
clusterInstance.createTopic(TOPIC1, 4, (short) 3);
clusterInstance.createTopic(TOPIC2, 4, (short) 3);
try (Producer<byte[], byte[]> producer = clusterInstance.producer(Map.of(
ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTION_ID
))
) {
producer.initTransactions();
// Start and then abort a transaction to allow the transactional ID to expire.
producer.beginTransaction();
producer.send(new ProducerRecord<>(TOPIC1, 0, "2".getBytes(), "2".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, ABORTED_VALUE))));
producer.send(new ProducerRecord<>(TOPIC2, 0, "4".getBytes(), "4".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, ABORTED_VALUE))));
producer.abortTransaction();
// Check the transactional state exists and then wait for it to expire.
waitUntilTransactionalStateExists(clusterInstance);
waitUntilTransactionalStateExpires(clusterInstance);
// Start a new transaction and attempt to send, triggering an AddPartitionsToTxnRequest that will fail
// due to the expired transactional ID, resulting in a fatal error.
producer.beginTransaction();
Future<RecordMetadata> failedFuture = producer.send(
new ProducerRecord<>(TOPIC1, 3, "1".getBytes(), "1".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, ABORTED_VALUE))));
TestUtils.waitForCondition(failedFuture::isDone, "Producer future never completed.");
org.apache.kafka.test.TestUtils.assertFutureThrows(InvalidPidMappingException.class, failedFuture);
// Assert that aborting the transaction throws a KafkaException due to the fatal error.
assertThrows(KafkaException.class, producer::abortTransaction);
}
// Reinitialize to recover from the fatal error.
try (Producer<byte[], byte[]> producer = clusterInstance.producer(Map.of(
ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTION_ID
))
) {
producer.initTransactions();
// Proceed with a new transaction after reinitializing.
producer.beginTransaction();
producer.send(new ProducerRecord<>(TOPIC2, null, "2".getBytes(), "2".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
producer.send(new ProducerRecord<>(TOPIC1, 2, "4".getBytes(), "4".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
producer.send(new ProducerRecord<>(TOPIC2, null, "1".getBytes(), "1".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
producer.send(new ProducerRecord<>(TOPIC1, 3, "3".getBytes(), "3".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
producer.commitTransaction();
waitUntilTransactionalStateExists(clusterInstance);
}
assertConsumeRecords(clusterInstance, List.of(TOPIC1, TOPIC2), 4);
}
private void testTransactionAfterProducerIdExpires(ClusterInstance clusterInstance, boolean isTV2Enabled) throws InterruptedException {
clusterInstance.createTopic(TOPIC1, 4, (short) 3);
long oldProducerId = 0;
long oldProducerEpoch = 0;
try (Producer<byte[], byte[]> producer = clusterInstance.producer(Map.of(
ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTION_ID
))
) {
producer.initTransactions();
// Start and then abort a transaction to allow the producer ID to expire.
producer.beginTransaction();
producer.send(new ProducerRecord<>(TOPIC1, 0, "2".getBytes(), "2".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, ABORTED_VALUE))));
producer.flush();
// Ensure producer IDs are added.
List<ProducerState> producerStates = new ArrayList<>();
TestUtils.waitForCondition(() -> {
try {
producerStates.addAll(producerState(clusterInstance));
return !producerStates.isEmpty();
} catch (ExecutionException | InterruptedException e) {
return false;
}
}, "Producer IDs for " + TOPIC1_PARTITION0 + " did not propagate quickly");
assertEquals(1, producerStates.size(), "Unexpected producer to " + TOPIC1_PARTITION0);
oldProducerId = producerStates.get(0).producerId();
oldProducerEpoch = producerStates.get(0).producerEpoch();
producer.abortTransaction();
// Wait for the producer ID to expire.
TestUtils.waitForCondition(() -> {
try {
return producerState(clusterInstance).isEmpty();
} catch (ExecutionException | InterruptedException e) {
return false;
}
}, "Producer IDs for " + TOPIC1_PARTITION0 + " did not expire.");
}
// Create a new producer to check that we retain the producer ID in transactional state.
try (Producer<byte[], byte[]> producer = clusterInstance.producer(Map.of(
ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTION_ID
))
) {
producer.initTransactions();
// Start a new transaction and attempt to send. This should work since only the producer ID was removed from its mapping in ProducerStateManager.
producer.beginTransaction();
producer.send(new ProducerRecord<>(TOPIC1, 0, "4".getBytes(), "4".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
producer.send(new ProducerRecord<>(TOPIC1, 3, "3".getBytes(), "3".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
producer.commitTransaction();
// Producer IDs should repopulate.
List<ProducerState> producerStates = new ArrayList<>();
TestUtils.waitForCondition(() -> {
try {
producerStates.addAll(producerState(clusterInstance));
return !producerStates.isEmpty();
} catch (ExecutionException | InterruptedException e) {
return false;
}
}, "Producer IDs for " + TOPIC1_PARTITION0 + " did not propagate quickly");
assertEquals(1, producerStates.size(), "Unexpected producer to " + TOPIC1_PARTITION0);
long newProducerId = producerStates.get(0).producerId();
long newProducerEpoch = producerStates.get(0).producerEpoch();
// Because the transaction IDs outlive the producer IDs, creating a producer with the same transactional id
// soon after the first will re-use the same producerId, while bumping the epoch to indicate that they are distinct.
assertEquals(oldProducerId, newProducerId);
if (isTV2Enabled) {
// TV2 bumps epoch on EndTxn, and the final commit may or may not have bumped the epoch in the producer state.
// The epoch should be at least oldProducerEpoch + 2 for the first commit and the restarted producer.
assertTrue(oldProducerEpoch + 2 <= newProducerEpoch);
} else {
assertEquals(oldProducerEpoch + 1, newProducerEpoch);
}
assertConsumeRecords(clusterInstance, List.of(TOPIC1), 2);
}
}
private void waitUntilTransactionalStateExists(ClusterInstance clusterInstance) throws InterruptedException {
try (Admin admin = clusterInstance.admin()) {
TestUtils.waitForCondition(() -> {
try {
admin.describeTransactions(List.of(TRANSACTION_ID)).description(TRANSACTION_ID).get();
return true;
} catch (Exception e) {
return false;
}
}, "Transactional state was never added.");
}
}
private void waitUntilTransactionalStateExpires(ClusterInstance clusterInstance) throws InterruptedException {
try (Admin admin = clusterInstance.admin()) {
TestUtils.waitForCondition(() -> {
try {
admin.describeTransactions(List.of(TRANSACTION_ID)).description(TRANSACTION_ID).get();
return false;
} catch (Exception e) {
return e.getCause() instanceof TransactionalIdNotFoundException;
}
}, "Transaction state never expired.");
}
}
private List<ProducerState> producerState(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
try (Admin admin = clusterInstance.admin()) {
return admin.describeProducers(List.of(TOPIC1_PARTITION0)).partitionResult(TOPIC1_PARTITION0).get().activeProducers();
}
}
private void assertConsumeRecords(
ClusterInstance clusterInstance,
List<String> topics,
int expectedCount
) throws InterruptedException {
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
ArrayList<ConsumerRecord<byte[], byte[]>> consumerRecords = new ArrayList<>();
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(
ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name(),
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"
)
)) {
consumer.subscribe(topics);
TestUtils.waitForCondition(() -> {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
records.forEach(consumerRecords::add);
return consumerRecords.size() == expectedCount;
}, 15_000, () -> "Consumer with protocol " + groupProtocol.name + " should consume " + expectedCount + " records, but get " + consumerRecords.size());
}
consumerRecords.forEach(record -> {
Iterator<Header> headers = record.headers().headers(HEADER_KEY).iterator();
assertTrue(headers.hasNext());
Header header = headers.next();
assertArrayEquals(COMMITTED_VALUE, header.value(), "Record does not have the expected header value.");
});
}
}
}

View File

@ -1,247 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.util.{Collections, Properties}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.utils.TestUtils.{consumeRecords, createAdminClient}
import org.apache.kafka.clients.admin.{Admin, ProducerState}
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.{InvalidPidMappingException, TransactionalIdNotFoundException}
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
import scala.jdk.CollectionConverters._
import scala.collection.Seq
// Test class that uses a very small transaction timeout to trigger InvalidPidMapping errors
class TransactionsExpirationTest extends KafkaServerTestHarness {
val topic1 = "topic1"
val topic2 = "topic2"
val numPartitions = 4
val replicationFactor = 3
val tp0 = new TopicPartition(topic1, 0)
var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
var consumer: Consumer[Array[Byte], Array[Byte]] = _
var admin: Admin = _
override def generateConfigs: Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(3).map(KafkaConfig.fromProps(_, serverProps()))
}
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
producer = TestUtils.createTransactionalProducer("transactionalProducer", brokers)
consumer = TestUtils.createConsumer(bootstrapServers(),
groupProtocolFromTestParameters(),
enableAutoCommit = false,
readCommitted = true)
admin = createAdminClient(brokers, listenerName)
createTopic(topic1, numPartitions, 3)
createTopic(topic2, numPartitions, 3)
}
@AfterEach
override def tearDown(): Unit = {
if (producer != null)
producer.close()
if (consumer != null)
consumer.close()
if (admin != null)
admin.close()
super.tearDown()
}
@ParameterizedTest
@CsvSource(Array(
"kraft,classic,false",
"kraft,consumer,false",
"kraft,classic,true",
"kraft,consumer,true",
))
def testFatalErrorAfterInvalidProducerIdMapping(quorum: String, groupProtocol: String, isTV2Enabled: Boolean): Unit = {
producer.initTransactions()
// Start and then abort a transaction to allow the transactional ID to expire.
producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "2", "2", willBeCommitted = false))
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 0, "4", "4", willBeCommitted = false))
producer.abortTransaction()
// Check the transactional state exists and then wait for it to expire.
waitUntilTransactionalStateExists()
waitUntilTransactionalStateExpires()
// Start a new transaction and attempt to send, triggering an AddPartitionsToTxnRequest that will fail
// due to the expired transactional ID, resulting in a fatal error.
producer.beginTransaction()
val failedFuture = producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, "1", "1", willBeCommitted = false))
TestUtils.waitUntilTrue(() => failedFuture.isDone, "Producer future never completed.")
org.apache.kafka.test.TestUtils.assertFutureThrows(classOf[InvalidPidMappingException], failedFuture)
// Assert that aborting the transaction throws a KafkaException due to the fatal error.
assertThrows(classOf[KafkaException], () => producer.abortTransaction())
// Close the producer and reinitialize to recover from the fatal error.
producer.close()
producer = TestUtils.createTransactionalProducer("transactionalProducer", brokers)
producer.initTransactions()
// Proceed with a new transaction after reinitializing.
producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "2", "2", willBeCommitted = true))
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 2, "4", "4", willBeCommitted = true))
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "1", "1", willBeCommitted = true))
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, "3", "3", willBeCommitted = true))
producer.commitTransaction()
waitUntilTransactionalStateExists()
consumer.subscribe(List(topic1, topic2).asJava)
val records = consumeRecords(consumer, 4)
records.foreach { record =>
TestUtils.assertCommittedAndGetValue(record)
}
}
@ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}.isTV2Enabled={2}")
@CsvSource(Array(
"kraft,classic,false",
"kraft,consumer,false",
"kraft,classic,true",
"kraft,consumer,true",
))
def testTransactionAfterProducerIdExpires(quorum: String, groupProtocol: String, isTV2Enabled: Boolean): Unit = {
producer.initTransactions()
// Start and then abort a transaction to allow the producer ID to expire.
producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "2", "2", willBeCommitted = false))
producer.flush()
// Ensure producer IDs are added.
var pState : List[ProducerState] = null
TestUtils.waitUntilTrue(() => { pState = producerState; pState.nonEmpty}, "Producer IDs for topic1 did not propagate quickly")
assertEquals(1, pState.size, "Unexpected producer to topic1")
val oldProducerId = pState.head.producerId
val oldProducerEpoch = pState.head.producerEpoch
producer.abortTransaction()
// Wait for the producer ID to expire.
TestUtils.waitUntilTrue(() => producerState.isEmpty, "Producer IDs for topic1 did not expire.")
// Create a new producer to check that we retain the producer ID in transactional state.
producer.close()
producer = TestUtils.createTransactionalProducer("transactionalProducer", brokers)
producer.initTransactions()
// Start a new transaction and attempt to send. This should work since only the producer ID was removed from its mapping in ProducerStateManager.
producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "4", "4", willBeCommitted = true))
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, "3", "3", willBeCommitted = true))
producer.commitTransaction()
// Producer IDs should repopulate.
var pState2 : List[ProducerState] = null
TestUtils.waitUntilTrue(() => {pState2 = producerState; pState2.nonEmpty}, "Producer IDs for topic1 did not propagate quickly")
assertEquals(1, pState2.size, "Unexpected producer to topic1")
val newProducerId = pState2.head.producerId
val newProducerEpoch = pState2.head.producerEpoch
// Because the transaction IDs outlive the producer IDs, creating a producer with the same transactional id
// soon after the first will re-use the same producerId, while bumping the epoch to indicate that they are distinct.
assertEquals(oldProducerId, newProducerId)
if (isTV2Enabled) {
// TV2 bumps epoch on EndTxn, and the final commit may or may not have bumped the epoch in the producer state.
// The epoch should be at least oldProducerEpoch + 2 for the first commit and the restarted producer.
assertTrue(oldProducerEpoch + 2 <= newProducerEpoch)
} else {
assertEquals(oldProducerEpoch + 1, newProducerEpoch)
}
consumer.subscribe(List(topic1).asJava)
val records = consumeRecords(consumer, 2)
records.foreach { record =>
TestUtils.assertCommittedAndGetValue(record)
}
}
private def producerState: List[ProducerState] = {
val describeResult = admin.describeProducers(Collections.singletonList(tp0))
val activeProducers = describeResult.partitionResult(tp0).get().activeProducers
activeProducers.asScala.toList
}
private def waitUntilTransactionalStateExpires(): Unit = {
TestUtils.waitUntilTrue(() => {
var removedTransactionState = false
val txnDescribeResult = admin.describeTransactions(Collections.singletonList("transactionalProducer")).description("transactionalProducer")
try {
txnDescribeResult.get()
} catch {
case e: Exception => {
removedTransactionState = e.getCause.isInstanceOf[TransactionalIdNotFoundException]
}
}
removedTransactionState
}, "Transaction state never expired.")
}
private def waitUntilTransactionalStateExists(): Unit = {
val describeState = admin.describeTransactions(Collections.singletonList("transactionalProducer")).description("transactionalProducer")
TestUtils.waitUntilTrue(() => describeState.isDone, "Transactional state was never added.")
}
private def serverProps(): Properties = {
val serverProps = new Properties()
serverProps.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, false.toString)
// Set a smaller value for the number of partitions for the __consumer_offsets topic
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long.
serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 1.toString)
serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 3.toString)
serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toString)
serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 2.toString)
serverProps.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, true.toString)
serverProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, false.toString)
serverProps.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, false.toString)
serverProps.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0")
serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "200")
serverProps.put(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, "10000")
serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, "500")
serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, "5000")
serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, "500")
serverProps
}
}

View File

@ -119,7 +119,7 @@ public class TestUtils {
* uses default value of 15 seconds for timeout * uses default value of 15 seconds for timeout
*/ */
public static void waitForCondition(final Supplier<Boolean> testCondition, final String conditionDetails) throws InterruptedException { public static void waitForCondition(final Supplier<Boolean> testCondition, final String conditionDetails) throws InterruptedException {
waitForCondition(testCondition, DEFAULT_MAX_WAIT_MS, conditionDetails); waitForCondition(testCondition, DEFAULT_MAX_WAIT_MS, () -> conditionDetails);
} }
/** /**
@ -128,9 +128,9 @@ public class TestUtils {
* without unnecessarily increasing test time (as the condition is checked frequently). The longer timeout is needed to * without unnecessarily increasing test time (as the condition is checked frequently). The longer timeout is needed to
* avoid transient failures due to slow or overloaded machines. * avoid transient failures due to slow or overloaded machines.
*/ */
public static void waitForCondition(final Supplier<Boolean> testCondition, public static void waitForCondition(final Supplier<Boolean> testCondition,
final long maxWaitMs, final long maxWaitMs,
String conditionDetails) throws InterruptedException { final Supplier<String> conditionDetails) throws InterruptedException {
final long expectedEnd = System.currentTimeMillis() + maxWaitMs; final long expectedEnd = System.currentTimeMillis() + maxWaitMs;
while (true) { while (true) {
@ -138,7 +138,7 @@ public class TestUtils {
if (testCondition.get()) { if (testCondition.get()) {
return; return;
} }
String conditionDetail = conditionDetails == null ? "" : conditionDetails; String conditionDetail = conditionDetails.get() == null ? "" : conditionDetails.get();
throw new TimeoutException("Condition not met: " + conditionDetail); throw new TimeoutException("Condition not met: " + conditionDetail);
} catch (final AssertionError t) { } catch (final AssertionError t) {
if (expectedEnd <= System.currentTimeMillis()) { if (expectedEnd <= System.currentTimeMillis()) {
@ -153,6 +153,18 @@ public class TestUtils {
} }
} }
/**
* Wait for condition to be met for at most {@code maxWaitMs} and throw assertion failure otherwise.
* This should be used instead of {@code Thread.sleep} whenever possible as it allows a longer timeout to be used
* without unnecessarily increasing test time (as the condition is checked frequently). The longer timeout is needed to
* avoid transient failures due to slow or overloaded machines.
*/
public static void waitForCondition(final Supplier<Boolean> testCondition,
final long maxWaitMs,
String conditionDetails) throws InterruptedException {
waitForCondition(testCondition, maxWaitMs, () -> conditionDetails);
}
public static int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin, public static int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
String topic, String topic,
int partitionNumber, int partitionNumber,