KAFKA-17258 Migrate AdminFenceProducersIntegrationTest to ClusterTestExtensions framework (#17311)

Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Chung, Ming-Yen 2024-10-03 00:47:29 +08:00 committed by GitHub
parent 7b7eb6243f
commit 540fb91103
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 147 additions and 161 deletions

View File

@ -223,6 +223,7 @@
<suppress checks="ImportControl" files="FetchResponseData.java"/>
<suppress checks="ImportControl" files="RecordsSerdeTest.java"/>
<suppress checks="ImportControl" files="ClientTelemetryTest.java"/>
<suppress checks="ImportControl" files="AdminFenceProducersTest.java"/>
<!-- Streams tests -->
<suppress checks="ClassFanOutComplexity"

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.FenceProducersOptions;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.jupiter.api.extension.ExtendWith;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ClusterTestDefaults(serverProperties = {
@ClusterConfigProperty(key = ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
@ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "2000")
})
@ExtendWith(ClusterTestExtensions.class)
public class AdminFenceProducersTest {
private static final String TOPIC_NAME = "mytopic";
private static final String TXN_ID = "mytxnid";
private static final String INCORRECT_BROKER_PORT = "225";
private static final ProducerRecord<byte[], byte[]> RECORD = new ProducerRecord<>(TOPIC_NAME, null, new byte[1]);
private final ClusterInstance clusterInstance;
AdminFenceProducersTest(ClusterInstance clusterInstance) {
this.clusterInstance = clusterInstance;
}
private KafkaProducer<byte[], byte[]> createProducer() {
Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, TXN_ID);
config.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "2000");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
return new KafkaProducer<>(config);
}
@ClusterTest
void testFenceAfterProducerCommit() throws Exception {
clusterInstance.createTopic(TOPIC_NAME, 1, (short) 1);
try (KafkaProducer<byte[], byte[]> producer = createProducer();
Admin adminClient = clusterInstance.createAdminClient()) {
producer.initTransactions();
producer.beginTransaction();
producer.send(RECORD).get();
producer.commitTransaction();
adminClient.fenceProducers(Collections.singletonList(TXN_ID)).all().get();
producer.beginTransaction();
ExecutionException exceptionDuringSend = assertThrows(
ExecutionException.class,
() -> producer.send(RECORD).get(), "expected ProducerFencedException"
);
assertInstanceOf(ProducerFencedException.class, exceptionDuringSend.getCause());
assertThrows(ProducerFencedException.class, producer::commitTransaction);
}
}
@ClusterTest
void testFenceProducerTimeoutMs() {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + INCORRECT_BROKER_PORT);
try (Admin adminClient = clusterInstance.createAdminClient(config)) {
ExecutionException exception = assertThrows(
ExecutionException.class, () ->
adminClient.fenceProducers(Collections.singletonList(TXN_ID), new FenceProducersOptions().timeoutMs(0)).all().get());
assertInstanceOf(TimeoutException.class, exception.getCause());
}
}
@ClusterTest
void testFenceBeforeProducerCommit() throws Exception {
clusterInstance.createTopic(TOPIC_NAME, 1, (short) 1);
try (KafkaProducer<byte[], byte[]> producer = createProducer();
Admin adminClient = clusterInstance.createAdminClient()) {
producer.initTransactions();
producer.beginTransaction();
producer.send(RECORD).get();
adminClient.fenceProducers(Collections.singletonList(TXN_ID)).all().get();
ExecutionException exceptionDuringSend = assertThrows(
ExecutionException.class, () ->
producer.send(RECORD).get(), "expected ProducerFencedException"
);
assertTrue(exceptionDuringSend.getCause() instanceof ProducerFencedException ||
exceptionDuringSend.getCause() instanceof InvalidProducerEpochException);
ApiException exceptionDuringCommit = assertThrows(
ApiException.class,
producer::commitTransaction, "Expected Exception"
);
assertTrue(exceptionDuringCommit instanceof ProducerFencedException ||
exceptionDuringCommit instanceof InvalidProducerEpochException);
}
}
}

View File

@ -1,161 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package integration.kafka.admin
import kafka.api.IntegrationTestHarness
import kafka.security.minikdc.MiniKdc.createConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.server.config.ServerLogConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.time.Duration
import java.util.concurrent.ExecutionException
import java.util.{Collections, Properties}
import scala.collection.Seq
@Tag("integration")
class AdminFenceProducersIntegrationTest extends IntegrationTestHarness {
override def brokerCount = 1
private val topicName = "mytopic"
private val txnId = "mytxnid"
private val record = new ProducerRecord[Array[Byte], Array[Byte]](topicName, null, new Array[Byte](1))
private var adminClient: Admin = _
private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
val producerProps = new Properties
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txnId)
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "2000")
producer = createProducer(configOverrides = producerProps)
adminClient = createAdminClient()
createTopic(topicName)
}
def overridingProps(): Properties = {
val props = new Properties()
props.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, false.toString)
// Set a smaller value for the number of partitions for speed
props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 1.toString)
props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 1.toString)
props.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 1.toString)
props.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "2000")
props
}
override protected def modifyConfigs(props: Seq[Properties]): Unit = {
props.foreach(p => p.putAll(overridingProps()))
}
override protected def kraftControllerConfigs(testInfo: TestInfo): Seq[Properties] = {
Seq(overridingProps())
}
@AfterEach
override def tearDown(): Unit = {
Utils.closeQuietly(adminClient, "AdminFenceProducersIntegrationTest")
Utils.closeQuietly(producer, "AdminFenceProducersIntegrationTest")
super.tearDown()
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testFenceAfterProducerCommit(quorum: String): Unit = {
producer.initTransactions()
producer.beginTransaction()
producer.send(record).get()
producer.commitTransaction()
adminClient.fenceProducers(Collections.singletonList(txnId)).all().get()
producer.beginTransaction()
try {
producer.send(record).get()
fail("expected ProducerFencedException")
} catch {
case _: ProducerFencedException => //ok
case ee: ExecutionException =>
assertInstanceOf(classOf[ProducerFencedException], ee.getCause) //ok
case e: Exception =>
throw e
}
assertThrows(classOf[ProducerFencedException], () => producer.commitTransaction())
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@Timeout(value = 30)
def testFenceProducerTimeoutMs(quorum: String): Unit = {
adminClient = {
val config = createConfig
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}")
Admin.create(config)
}
try {
val e = assertThrows(classOf[ExecutionException], () => adminClient.fenceProducers(Collections.singletonList(txnId),
new FenceProducersOptions().timeoutMs(0)).all().get())
assertInstanceOf(classOf[TimeoutException], e.getCause)
} finally adminClient.close(Duration.ofSeconds(0))
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testFenceBeforeProducerCommit(quorum: String): Unit = {
producer.initTransactions()
producer.beginTransaction()
producer.send(record).get()
adminClient.fenceProducers(Collections.singletonList(txnId)).all().get()
try {
producer.send(record).get()
fail("expected Exception")
} catch {
case ee: ExecutionException =>
assertTrue(ee.getCause.isInstanceOf[ProducerFencedException] ||
ee.getCause.isInstanceOf[InvalidProducerEpochException],
"Unexpected ExecutionException cause " + ee.getCause)
case e: Exception =>
throw e
}
try {
producer.commitTransaction()
fail("expected Exception")
} catch {
case _: ProducerFencedException =>
case _: InvalidProducerEpochException =>
case e: Exception =>
throw e
}
}
}