KAFKA-16482 Eliminate the IDE warnings of accepting ClusterConfig in BeforeEach (LeaderElectionCommandTest and ProducerIdsIntegrationTest) (#15676)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Cheng-Kai, Zhang 2024-04-11 18:42:14 +08:00 committed by GitHub
parent 619f27015f
commit 72b823e9bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 18 additions and 23 deletions

View File

@ -19,9 +19,9 @@ package kafka.coordinator.transaction
import kafka.network.SocketServer import kafka.network.SocketServer
import kafka.server.{IntegrationTestUtils, KafkaConfig} import kafka.server.{IntegrationTestUtils, KafkaConfig}
import kafka.test.annotation.{AutoStart, ClusterTest, ClusterTests, Type} import kafka.test.ClusterInstance
import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type}
import kafka.test.junit.ClusterTestExtensions import kafka.test.junit.ClusterTestExtensions
import kafka.test.{ClusterConfig, ClusterInstance}
import org.apache.kafka.common.message.InitProducerIdRequestData import org.apache.kafka.common.message.InitProducerIdRequestData
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
@ -29,22 +29,20 @@ import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{InitProducerIdRequest, InitProducerIdResponse} import org.apache.kafka.common.requests.{InitProducerIdRequest, InitProducerIdResponse}
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Disabled, Timeout}
import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{Disabled, Timeout}
import java.util.stream.{Collectors, IntStream} import java.util.stream.{Collectors, IntStream}
import scala.concurrent.duration.DurationInt import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ClusterTestDefaults(serverProperties = Array(
new ClusterConfigProperty(key = "transaction.state.log.num.partitions", value = "1")
))
@ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ProducerIdsIntegrationTest { class ProducerIdsIntegrationTest {
@BeforeEach
def setup(clusterConfig: ClusterConfig): Unit = {
clusterConfig.serverProperties().put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
clusterConfig.serverProperties().put(KafkaConfig.TransactionsTopicReplicationFactorProp, "3")
}
@ClusterTests(Array( @ClusterTests(Array(
new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1), new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1),
new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0), new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0),

View File

@ -16,8 +16,8 @@
*/ */
package org.apache.kafka.tools; package org.apache.kafka.tools;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance; import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTest; import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults; import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.Type; import kafka.test.annotation.Type;
@ -30,7 +30,6 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.server.common.AdminCommandFailedException; import org.apache.kafka.server.common.AdminCommandFailedException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import scala.collection.JavaConverters; import scala.collection.JavaConverters;
@ -60,7 +59,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ExtendWith(value = ClusterTestExtensions.class) @ExtendWith(value = ClusterTestExtensions.class)
@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3) @ClusterTestDefaults(clusterType = Type.ALL, brokers = 3, serverProperties = {
@ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
@ClusterConfigProperty(key = "auto.leader.rebalance.enable", value = "false"),
@ClusterConfigProperty(key = "controlled.shutdown.enable", value = "true"),
@ClusterConfigProperty(key = "controlled.shutdown.max.retries", value = "1"),
@ClusterConfigProperty(key = "controlled.shutdown.retry.backoff.ms", value = "1000"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "2")
})
@Tag("integration") @Tag("integration")
public class LeaderElectionCommandTest { public class LeaderElectionCommandTest {
private final ClusterInstance cluster; private final ClusterInstance cluster;
@ -71,16 +77,6 @@ public class LeaderElectionCommandTest {
this.cluster = cluster; this.cluster = cluster;
} }
@BeforeEach
void setup(ClusterConfig clusterConfig) {
TestUtils.verifyNoUnexpectedThreads("@BeforeEach");
clusterConfig.serverProperties().put("auto.leader.rebalance.enable", "false");
clusterConfig.serverProperties().put("controlled.shutdown.enable", "true");
clusterConfig.serverProperties().put("controlled.shutdown.max.retries", "1");
clusterConfig.serverProperties().put("controlled.shutdown.retry.backoff.ms", "1000");
clusterConfig.serverProperties().put("offsets.topic.replication.factor", "2");
}
@ClusterTest @ClusterTest
public void testAllTopicPartition() throws InterruptedException, ExecutionException { public void testAllTopicPartition() throws InterruptedException, ExecutionException {
String topic = "unclean-topic"; String topic = "unclean-topic";
@ -244,7 +240,7 @@ public class LeaderElectionCommandTest {
@ClusterTest @ClusterTest
public void testTopicDoesNotExist() { public void testTopicDoesNotExist() {
Throwable e = assertThrows(AdminCommandFailedException.class, () -> LeaderElectionCommand.run( Throwable e = assertThrows(AdminCommandFailedException.class, () -> LeaderElectionCommand.run(
Duration.ofSeconds(30), Duration.ofSeconds(30),
"--bootstrap-server", cluster.bootstrapServers(), "--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "preferred", "--election-type", "preferred",
@ -326,6 +322,7 @@ public class LeaderElectionCommandTest {
return file.toPath(); return file.toPath();
} }
private static Path tempAdminConfig(String defaultApiTimeoutMs, String requestTimeoutMs) throws Exception { private static Path tempAdminConfig(String defaultApiTimeoutMs, String requestTimeoutMs) throws Exception {
String content = "default.api.timeout.ms=" + defaultApiTimeoutMs + "\nrequest.timeout.ms=" + requestTimeoutMs; String content = "default.api.timeout.ms=" + defaultApiTimeoutMs + "\nrequest.timeout.ms=" + requestTimeoutMs;
java.io.File file = TestUtils.tempFile("admin-config", ".properties"); java.io.File file = TestUtils.tempFile("admin-config", ".properties");