KAFKA-18695 Remove quorum=kraft and kip932 from all integration tests (#19633)
CI / build (push) Waiting to run Details

Currently, the quorum uses kraft by default, so there's no need to
specify it explicitly.

For kip932 and isShareGroupTest, they are no longer used after #19542 .

Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
 <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ming-Yen Chung 2025-05-12 01:28:30 +08:00 committed by GitHub
parent 54fd1361e5
commit 57ae6d6706
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
55 changed files with 776 additions and 1211 deletions

View File

@ -50,9 +50,8 @@ import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.time.Duration;
@ -137,9 +136,8 @@ public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarnes
if (adminClient != null) adminClient.close();
}
@ParameterizedTest
@ValueSource(strings = {"kraft"})
public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(String quorum) throws ExecutionException, InterruptedException {
@Test
public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException {
adminClient.createTopics(
List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
@ -224,9 +222,8 @@ public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarnes
);
}
@ParameterizedTest
@ValueSource(strings = {"kraft"})
public void testElrMemberCanBeElected(String quorum) throws ExecutionException, InterruptedException {
@Test
public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException {
adminClient.createTopics(
List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
@ -300,9 +297,8 @@ public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarnes
}
}
@ParameterizedTest
@ValueSource(strings = {"kraft"})
public void testElrMemberShouldBeKickOutWhenUncleanShutdown(String quorum) throws ExecutionException, InterruptedException {
@Test
public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException {
adminClient.createTopics(
List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
@ -361,9 +357,8 @@ public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarnes
/*
This test is only valid for KIP-966 part 1. When the unclean recovery is implemented, it should be removed.
*/
@ParameterizedTest
@ValueSource(strings = {"kraft"})
public void testLastKnownLeaderShouldBeElectedIfEmptyElr(String quorum) throws ExecutionException, InterruptedException {
@Test
public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException {
adminClient.createTopics(
List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);

View File

@ -28,9 +28,9 @@ import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentState}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo}
import org.junit.jupiter.api.{BeforeEach, Tag, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
import org.junit.jupiter.params.provider.CsvSource
import java.util
import java.util.concurrent.atomic.AtomicInteger
@ -73,9 +73,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
testTopicName = s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}"
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithValidRetentionTime(quorum: String): Unit = {
@Test
def testCreateRemoteTopicWithValidRetentionTime(): Unit = {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
@ -85,9 +84,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(topicConfig)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithValidRetentionSize(quorum: String): Unit = {
@Test
def testCreateRemoteTopicWithValidRetentionSize(): Unit = {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512")
@ -97,9 +95,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(topicConfig)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithInheritedLocalRetentionTime(quorum: String): Unit = {
@Test
def testCreateRemoteTopicWithInheritedLocalRetentionTime(): Unit = {
// inherited local retention ms is 1000
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
@ -109,9 +106,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(topicConfig)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithInheritedLocalRetentionSize(quorum: String): Unit = {
@Test
def testCreateRemoteTopicWithInheritedLocalRetentionSize(): Unit = {
// inherited local retention bytes is 1024
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
@ -121,9 +117,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(topicConfig)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithInvalidRetentionTime(quorum: String): Unit = {
@Test
def testCreateRemoteTopicWithInvalidRetentionTime(): Unit = {
// inherited local retention ms is 1000
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
@ -133,9 +128,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig = topicConfig))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithInvalidRetentionSize(quorum: String): Unit = {
@Test
def testCreateRemoteTopicWithInvalidRetentionSize(): Unit = {
// inherited local retention bytes is 1024
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
@ -145,9 +139,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig = topicConfig))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateCompactedRemoteStorage(quorum: String): Unit = {
@Test
def testCreateCompactedRemoteStorage(): Unit = {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact")
@ -158,8 +151,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
// `remote.log.delete.on.disable` and `remote.log.copy.disable` only works in KRaft mode.
@ParameterizedTest
@CsvSource(Array("kraft,true,true", "kraft,true,false", "kraft,false,true", "kraft,false,false"))
def testCreateRemoteTopicWithCopyDisabledAndDeleteOnDisable(quorum: String, copyDisabled: Boolean, deleteOnDisable: Boolean): Unit = {
@CsvSource(Array("true,true", "true,false", "false,true", "false,false"))
def testCreateRemoteTopicWithCopyDisabledAndDeleteOnDisable(copyDisabled: Boolean, deleteOnDisable: Boolean): Unit = {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, copyDisabled.toString)
topicConfig.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, deleteOnDisable.toString)
@ -169,9 +162,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}
// `remote.log.delete.on.disable` only works in KRaft mode.
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateTopicRetentionMsValidationWithRemoteCopyDisabled(quorum: String): Unit = {
@Test
def testCreateTopicRetentionMsValidationWithRemoteCopyDisabled(): Unit = {
val testTopicName2 = testTopicName + "2"
val testTopicName3 = testTopicName + "3"
val errorMsgMs = "When `remote.log.copy.disable` is set to true, the `local.retention.ms` and `retention.ms` " +
@ -235,9 +227,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
admin.incrementalAlterConfigs(configs).all().get()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateTopicRetentionBytesValidationWithRemoteCopyDisabled(quorum: String): Unit = {
@Test
def testCreateTopicRetentionBytesValidationWithRemoteCopyDisabled(): Unit = {
val testTopicName2 = testTopicName + "2"
val testTopicName3 = testTopicName + "3"
val errorMsgBytes = "When `remote.log.copy.disable` is set to true, the `local.retention.bytes` and `retention.bytes` " +
@ -300,9 +291,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
admin.incrementalAlterConfigs(configs).all().get()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testEnableRemoteLogOnExistingTopicTest(quorum: String): Unit = {
@Test
def testEnableRemoteLogOnExistingTopicTest(): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
@ -318,9 +308,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(topicConfig)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testEnableRemoteLogWhenSystemRemoteStorageIsDisabled(quorum: String): Unit = {
@Test
def testEnableRemoteLogWhenSystemRemoteStorageIsDisabled(): Unit = {
val admin = createAdminClient()
val topicConfigWithRemoteStorage = new Properties()
@ -342,9 +331,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
assertTrue(errorMessage.getMessage.contains("Tiered Storage functionality is disabled in the broker"))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUpdateTopicConfigWithValidRetentionTimeTest(quorum: String): Unit = {
@Test
def testUpdateTopicConfigWithValidRetentionTimeTest(): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
@ -363,9 +351,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(topicConfig)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUpdateTopicConfigWithValidRetentionSizeTest(quorum: String): Unit = {
@Test
def testUpdateTopicConfigWithValidRetentionSizeTest(): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
@ -384,9 +371,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(topicConfig)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUpdateTopicConfigWithInheritedLocalRetentionTime(quorum: String): Unit = {
@Test
def testUpdateTopicConfigWithInheritedLocalRetentionTime(): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
@ -404,9 +390,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
() => admin.incrementalAlterConfigs(configs).all().get())
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUpdateTopicConfigWithInheritedLocalRetentionSize(quorum: String): Unit = {
@Test
def testUpdateTopicConfigWithInheritedLocalRetentionSize(): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
@ -425,9 +410,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}
// The remote storage config validation on controller level only works in KRaft
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUpdateTopicConfigWithDisablingRemoteStorage(quorum: String): Unit = {
@Test
def testUpdateTopicConfigWithDisablingRemoteStorage(): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
@ -446,9 +430,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
"If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUpdateTopicConfigWithDisablingRemoteStorageWithDeleteOnDisable(quorum: String): Unit = {
@Test
def testUpdateTopicConfigWithDisablingRemoteStorageWithDeleteOnDisable(): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
@ -473,9 +456,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(newProps)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testTopicDeletion(quorum: String): Unit = {
@Test
def testTopicDeletion(): Unit = {
MyRemoteStorageManager.deleteSegmentEventCounter.set(0)
val numPartitions = 2
val topicConfig = new Properties()
@ -492,9 +474,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
"Remote log segments should be deleted only once by the leader")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(quorum: String): Unit = {
@Test
def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(): Unit = {
val topicConfig = new Properties()
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
@ -510,9 +491,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
faultHandler.setIgnore(true)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testClusterWithoutTieredStorageStartsSuccessfullyIfTopicWithTieringDisabled(quorum: String): Unit = {
@Test
def testClusterWithoutTieredStorageStartsSuccessfullyIfTopicWithTieringDisabled(): Unit = {
val topicConfig = new Properties()
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, false.toString)

View File

@ -29,9 +29,7 @@ import org.apache.kafka.server.policy.AlterConfigPolicy
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
@ -79,9 +77,8 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
props.put(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, classOf[Policy])
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testValidAlterConfigs(quorum: String): Unit = {
@Test
def testValidAlterConfigs(): Unit = {
client = Admin.create(createConfig)
// Create topics
val topic1 = "describe-alter-configs-topic-1"
@ -100,16 +97,14 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
PlaintextAdminIntegrationTest.checkValidAlterConfigs(client, this, topicResource1, topicResource2, maxMessageBytes, retentionMs)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testInvalidAlterConfigs(quorum: String): Unit = {
@Test
def testInvalidAlterConfigs(): Unit = {
client = Admin.create(createConfig)
PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(this, client)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testInvalidAlterConfigsDueToPolicy(quorum: String): Unit = {
@Test
def testInvalidAlterConfigsDueToPolicy(): Unit = {
client = Admin.create(createConfig)
// Create topics

View File

@ -26,8 +26,7 @@ import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.Test
import java.util.Collections
import scala.concurrent.ExecutionException
@ -94,18 +93,16 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest extends DelegationTokenE
createScramAdminClient(kafkaClientSaslMechanism, tokenRequesterPrincipal.getName, tokenRequesterPassword)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateTokenForOtherUserFails(quorum: String): Unit = {
@Test
def testCreateTokenForOtherUserFails(): Unit = {
val thrown = assertThrows(classOf[ExecutionException], () => {
createDelegationTokens(() => new CreateDelegationTokenOptions().owner(otherClientPrincipal), assert = false)
})
assertTrue(thrown.getMessage.contains("Delegation Token authorization failed"))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDescribeTokenForOtherUserFails(quorum: String): Unit = {
@Test
def testDescribeTokenForOtherUserFails(): Unit = {
Using.resource(createScramAdminClient(kafkaClientSaslMechanism, describeTokenFailPrincipal.getName, describeTokenFailPassword)) { describeTokenFailAdminClient =>
Using.resource(createScramAdminClient(kafkaClientSaslMechanism, otherClientPrincipal.getName, otherClientPassword)) { otherClientAdminClient =>
otherClientAdminClient.createDelegationToken().delegationToken().get()
@ -117,9 +114,8 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest extends DelegationTokenE
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDescribeTokenForOtherUserPasses(quorum: String): Unit = {
@Test
def testDescribeTokenForOtherUserPasses(): Unit = {
val adminClient = createTokenRequesterAdminClient()
try {
val tokens = adminClient.describeDelegationToken(

View File

@ -79,9 +79,9 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
/**
* Verifies some of the metrics of producer, consumer as well as server.
*/
@ParameterizedTest(name = "testMetrics with systemRemoteStorageEnabled: {1}")
@CsvSource(Array("kraft, true", "kraft, false"))
def testMetrics(quorum: String, systemRemoteStorageEnabled: Boolean): Unit = {
@ParameterizedTest(name = "testMetrics with systemRemoteStorageEnabled: {0}")
@CsvSource(Array("true", "false"))
def testMetrics(systemRemoteStorageEnabled: Boolean): Unit = {
val topic = "mytopic"
createTopic(topic,
numPartitions = 1,

View File

@ -60,7 +60,7 @@ import org.apache.logging.log4j.core.config.Configurator
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{MethodSource, ValueSource}
import org.junit.jupiter.params.provider.{MethodSource}
import org.slf4j.LoggerFactory
import java.util.AbstractMap.SimpleImmutableEntry
@ -2511,9 +2511,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testListGroups(unused: String): Unit = {
@Test
def testListGroups(): Unit = {
val classicGroupId = "classic_group_id"
val consumerGroupId = "consumer_group_id"
val shareGroupId = "share_group_id"
@ -2643,9 +2642,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kip932"))
def testShareGroups(unused: String): Unit = {
def testShareGroups(): Unit = {
val testGroupId = "test_group_id"
val testClientId = "test_client_id"
val fakeGroupId = "fake_group_id"

View File

@ -21,10 +21,8 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth._
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.clients.admin.AdminClientConfig
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.apache.kafka.common.errors.TopicAuthorizationException
// This test case uses a separate listener for client and inter-broker communication, from
@ -88,9 +86,8 @@ class PlaintextEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
superuserClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(interBrokerListenerName))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testListenerName(quorum: String): Unit = {
@Test
def testListenerName(): Unit = {
// To check the client listener name, establish a session on the server by sending any request eg sendRecords
val producer = createProducer()
assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords = 1, tp))

View File

@ -22,8 +22,7 @@ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, Record
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.Test
import java.nio.charset.StandardCharsets
import java.util
@ -50,9 +49,8 @@ class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
* Producer will attempt to send messages to the partition specified in each record, and should
* succeed as long as the partition is included in the metadata.
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testSendWithTopicDeletionMidWay(quorum: String): Unit = {
@Test
def testSendWithTopicDeletionMidWay(): Unit = {
val numRecords = 10
val topic = "topic"
@ -91,9 +89,8 @@ class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
* Producer will attempt to send messages to the partition specified in each record, and should
* succeed as long as the metadata has been updated with new topic id.
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testSendWithRecreatedTopic(quorum: String): Unit = {
@Test
def testSendWithRecreatedTopic(): Unit = {
val numRecords = 10
val topic = "topic"
createTopic(topic)
@ -125,9 +122,8 @@ class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
* Producer will attempt to send messages to the partition specified in each record, and should
* succeed as long as the metadata cache on the leader includes the partition topic id.
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testSendWithTopicReassignmentIsMidWay(quorum: String): Unit = {
@Test
def testSendWithTopicReassignmentIsMidWay(): Unit = {
val numRecords = 10
val topic = "topic"
val partition0: TopicPartition = new TopicPartition(topic, 0)

View File

@ -23,7 +23,7 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.SaslAuthenticationException
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.Assertions._
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.common.config.SaslConfigs
@ -91,7 +91,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
closeSasl()
}
@ParameterizedTest(name="{displayName}.quorum=kraft.isIdempotenceEnabled={0}")
@ParameterizedTest(name="{displayName}.isIdempotenceEnabled={0}")
@ValueSource(booleans = Array(true, false))
def testProducerWithAuthenticationFailure(isIdempotenceEnabled: Boolean): Unit = {
val prop = new Properties()
@ -111,9 +111,8 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
verifyWithRetry(sendOneRecord(producer2))()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testTransactionalProducerWithAuthenticationFailure(quorum: String): Unit = {
@Test
def testTransactionalProducerWithAuthenticationFailure(): Unit = {
val txProducer = createTransactionalProducer()
verifyAuthenticationException(txProducer.initTransactions())
@ -157,9 +156,8 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
verifyWithRetry(consumer.poll(Duration.ofMillis(1000)))(_.count == 1)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testKafkaAdminClientWithAuthenticationFailure(quorum: String): Unit = {
@Test
def testKafkaAdminClientWithAuthenticationFailure(): Unit = {
val props = JaasTestUtils.adminClientSecurityConfigs(securityProtocol, OptionConverters.toJava(trustStoreFile), OptionConverters.toJava(clientSaslProperties))
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val adminClient = Admin.create(props)

View File

@ -34,9 +34,7 @@ import org.apache.kafka.common.{KafkaException, Uuid, requests}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.config.QuotaConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import java.io.IOException
import java.net.{InetAddress, Socket}
@ -84,9 +82,8 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
}
@Flaky("KAFKA-17999")
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDynamicConnectionQuota(quorum: String): Unit = {
@Test
def testDynamicConnectionQuota(): Unit = {
val maxConnectionsPerIP = 5
def connectAndVerify(): Unit = {
@ -112,9 +109,8 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
verifyMaxConnections(maxConnectionsPerIPOverride, connectAndVerify)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDynamicListenerConnectionQuota(quorum: String): Unit = {
@Test
def testDynamicListenerConnectionQuota(): Unit = {
val initialConnectionCount = connectionCount
def connectAndVerify(): Unit = {
@ -185,9 +181,8 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDynamicListenerConnectionCreationRateQuota(quorum: String): Unit = {
@Test
def testDynamicListenerConnectionCreationRateQuota(): Unit = {
// Create another listener. PLAINTEXT is an inter-broker listener
// keep default limits
val newListenerNames = Seq("PLAINTEXT", "EXTERNAL")
@ -247,9 +242,8 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
waitForConnectionCount(initialConnectionCount)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDynamicIpConnectionRateQuota(quorum: String): Unit = {
@Test
def testDynamicIpConnectionRateQuota(): Unit = {
val connRateLimit = 10
val initialConnectionCount = connectionCount
// before setting connection rate to 10, verify we can do at least double that by default (no limit)

View File

@ -24,9 +24,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.network.SocketServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import java.util.Properties
import scala.jdk.CollectionConverters._
@ -66,9 +64,8 @@ class DynamicNumNetworkThreadsTest extends BaseRequestTest {
.count(listener == _.tags().get("listener"))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDynamicNumNetworkThreads(quorum: String): Unit = {
@Test
def testDynamicNumNetworkThreads(): Unit = {
// Increase the base network thread count
val newBaseNetworkThreadsCount = SocketServerConfigs.NUM_NETWORK_THREADS_DEFAULT + 1
var props = new Properties

View File

@ -38,9 +38,9 @@ import org.apache.kafka.common.security.kerberos.KerberosLogin
import org.apache.kafka.common.utils.{LogContext, MockTime}
import org.apache.kafka.network.SocketServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{MethodSource, ValueSource}
import org.junit.jupiter.params.provider.MethodSource
import scala.jdk.CollectionConverters._
@ -92,9 +92,8 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
* Tests that Kerberos replay error `Request is a replay (34)` is not handled as an authentication exception
* since replay detection used to detect DoS attacks may occasionally reject valid concurrent requests.
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testRequestIsAReplay(quorum: String): Unit = {
@Test
def testRequestIsAReplay(): Unit = {
val successfulAuthsPerThread = 10
val futures = (0 until numThreads).map(_ => executor.submit(new Runnable {
override def run(): Unit = verifyRetriableFailuresDuringAuthentication(successfulAuthsPerThread)
@ -110,9 +109,8 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
* are able to connect after the second re-login. Verifies that logout is performed only once
* since duplicate logouts without successful login results in NPE from Java 9 onwards.
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testLoginFailure(quorum: String): Unit = {
@Test
def testLoginFailure(): Unit = {
val selector = createSelectorWithRelogin()
try {
val login = TestableKerberosLogin.instance
@ -134,9 +132,8 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
* is performed when credentials are unavailable between logout and login, we handle it as a
* transient error and not an authentication failure so that clients may retry.
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testReLogin(quorum: String): Unit = {
@Test
def testReLogin(): Unit = {
val selector = createSelectorWithRelogin()
try {
val login = TestableKerberosLogin.instance
@ -166,9 +163,8 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
* Tests that Kerberos error `Server not found in Kerberos database (7)` is handled
* as a fatal authentication failure.
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testServerNotFoundInKerberosDatabase(quorum: String): Unit = {
@Test
def testServerNotFoundInKerberosDatabase(): Unit = {
val jaasConfig = clientConfig.getProperty(SaslConfigs.SASL_JAAS_CONFIG)
val invalidServiceConfig = jaasConfig.replace("serviceName=\"kafka\"", "serviceName=\"invalid-service\"")
clientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, invalidServiceConfig)

View File

@ -159,10 +159,6 @@ abstract class QuorumTestHarness extends Logging {
private var testInfo: TestInfo = _
protected var implementation: QuorumImplementation = _
def isShareGroupTest(): Boolean = {
TestInfoUtils.isShareGroupTest(testInfo)
}
def maybeGroupProtocolSpecified(): Option[GroupProtocol] = {
TestInfoUtils.maybeGroupProtocolSpecified(testInfo)
}

View File

@ -34,10 +34,6 @@ object TestInfoUtils {
final val TestWithParameterizedGroupProtocolNames = "{displayName}.groupProtocol={0}"
def isShareGroupTest(testInfo: TestInfo): Boolean = {
testInfo.getDisplayName.contains("kip932")
}
def maybeGroupProtocolSpecified(testInfo: TestInfo): Option[GroupProtocol] = {
if (testInfo.getDisplayName.contains("groupProtocol=classic"))
Some(GroupProtocol.CLASSIC)

View File

@ -25,9 +25,7 @@ import org.apache.kafka.clients.admin.{Admin, NewPartitions, NewTopic}
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import java.util
import java.util.Arrays.asList
@ -65,9 +63,8 @@ class AddPartitionsTest extends BaseRequestTest {
admin = createAdminClient()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testWrongReplicaCount(quorum: String): Unit = {
@Test
def testWrongReplicaCount(): Unit = {
assertEquals(classOf[InvalidReplicaAssignmentException], assertThrows(classOf[ExecutionException], () => {
admin.createPartitions(Collections.singletonMap(topic1,
NewPartitions.increaseTo(2, singletonList(asList(0, 1, 2))))).all().get()
@ -78,9 +75,8 @@ class AddPartitionsTest extends BaseRequestTest {
* Test that when we supply a manual partition assignment to createTopics, it must be 0-based
* and consecutive.
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testMissingPartitionsInCreateTopics(quorum: String): Unit = {
@Test
def testMissingPartitionsInCreateTopics(): Unit = {
val topic6Placements = new util.HashMap[Integer, util.List[Integer]]
topic6Placements.put(1, asList(0, 1))
topic6Placements.put(2, asList(1, 0))
@ -104,9 +100,8 @@ class AddPartitionsTest extends BaseRequestTest {
* Test that when we supply a manual partition assignment to createPartitions, it must contain
* enough partitions.
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testMissingPartitionsInCreatePartitions(quorum: String): Unit = {
@Test
def testMissingPartitionsInCreatePartitions(): Unit = {
val cause = assertThrows(classOf[ExecutionException], () =>
admin.createPartitions(Collections.singletonMap(topic1,
NewPartitions.increaseTo(3, singletonList(asList(0, 1, 2))))).all().get()).getCause
@ -115,9 +110,8 @@ class AddPartitionsTest extends BaseRequestTest {
"were specified."), "Unexpected error message: " + cause.getMessage)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testIncrementPartitions(quorum: String): Unit = {
@Test
def testIncrementPartitions(): Unit = {
admin.createPartitions(Collections.singletonMap(topic1, NewPartitions.increaseTo(3))).all().get()
// wait until leader is elected
@ -144,9 +138,8 @@ class AddPartitionsTest extends BaseRequestTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testManualAssignmentOfReplicas(quorum: String): Unit = {
@Test
def testManualAssignmentOfReplicas(): Unit = {
// Add 2 partitions
admin.createPartitions(Collections.singletonMap(topic2, NewPartitions.increaseTo(3,
asList(asList(0, 1), asList(2, 3))))).all().get()
@ -173,9 +166,8 @@ class AddPartitionsTest extends BaseRequestTest {
assertEquals(Set(0, 1), replicas.asScala.toSet)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testReplicaPlacementAllServers(quorum: String): Unit = {
@Test
def testReplicaPlacementAllServers(): Unit = {
admin.createPartitions(Collections.singletonMap(topic3, NewPartitions.increaseTo(7))).all().get()
// read metadata from a broker and verify the new topic partitions exist
@ -201,9 +193,8 @@ class AddPartitionsTest extends BaseRequestTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testReplicaPlacementPartialServers(quorum: String): Unit = {
@Test
def testReplicaPlacementPartialServers(): Unit = {
admin.createPartitions(Collections.singletonMap(topic2, NewPartitions.increaseTo(3))).all().get()
// read metadata from a broker and verify the new topic partitions exist

View File

@ -1732,9 +1732,8 @@ class PartitionTest extends AbstractPartitionTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testIsrNotExpandedIfReplicaIsFencedOrShutdown(quorum: String): Unit = {
@Test
def testIsrNotExpandedIfReplicaIsFencedOrShutdown(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = topicId.toJava)
seedLogData(log, numRecords = 10, leaderEpoch = 4)

View File

@ -23,17 +23,15 @@ import scala.collection.Seq
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.server.config.ServerLogConfigs
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.Test
class MinIsrConfigTest extends KafkaServerTestHarness {
val overridingProps = new Properties()
overridingProps.put(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, "5")
def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1).map(KafkaConfig.fromProps(_, overridingProps))
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDefaultKafkaConfig(quorum: String): Unit = {
@Test
def testDefaultKafkaConfig(): Unit = {
assert(brokers.head.logManager.initialDefaultConfig.minInSyncReplicas == 5)
}
}

View File

@ -36,9 +36,9 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{MethodSource, ValueSource}
import org.junit.jupiter.params.provider.MethodSource
@Timeout(120)
class MetricsTest extends KafkaServerTestHarness with Logging {
@ -56,9 +56,8 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
val nMessages = 2
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testMetricsReporterAfterDeletingTopic(quorum: String): Unit = {
@Test
def testMetricsReporterAfterDeletingTopic(): Unit = {
val topic = "test-topic-metric"
createTopic(topic)
deleteTopic(topic)
@ -66,9 +65,8 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists after deleteTopic")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testBrokerTopicMetricsUnregisteredAfterDeletingTopic(quorum: String): Unit = {
@Test
def testBrokerTopicMetricsUnregisteredAfterDeletingTopic(): Unit = {
val topic = "test-broker-topic-metric"
createTopic(topic, 2)
// Produce a few messages to create the metrics
@ -81,33 +79,29 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists after deleteTopic")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testClusterIdMetric(quorum: String): Unit = {
@Test
def testClusterIdMetric(): Unit = {
// Check if clusterId metric exists.
val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == s"$requiredKafkaServerPrefix=ClusterId"), 1)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testBrokerStateMetric(quorum: String): Unit = {
@Test
def testBrokerStateMetric(): Unit = {
// Check if BrokerState metric exists.
val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == s"$requiredKafkaServerPrefix=BrokerState"), 1)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testYammerMetricsCountMetric(quorum: String): Unit = {
@Test
def testYammerMetricsCountMetric(): Unit = {
// Check if yammer-metrics-count metric exists.
val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == s"$requiredKafkaServerPrefix=yammer-metrics-count"), 1)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testLinuxIoMetrics(quorum: String): Unit = {
@Test
def testLinuxIoMetrics(): Unit = {
// Check if linux-disk-{read,write}-bytes metrics either do or do not exist depending on whether we are or are not
// able to collect those metrics on the platform where this test is running.
val usable = new LinuxIoMetricsCollector("/proc", Time.SYSTEM).usable()
@ -117,9 +111,8 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == s"$requiredKafkaServerPrefix=$name"), expectedCount))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testJMXFilter(quorum: String): Unit = {
@Test
def testJMXFilter(): Unit = {
// Check if cluster id metrics is not exposed in JMX
assertTrue(ManagementFactory.getPlatformMBeanServer
.isRegistered(new ObjectName("kafka.controller:type=KafkaController,name=ActiveControllerCount")))
@ -127,9 +120,8 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
.isRegistered(new ObjectName(s"$requiredKafkaServerPrefix=ClusterId")))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUpdateJMXFilter(quorum: String): Unit = {
@Test
def testUpdateJMXFilter(): Unit = {
// verify previously exposed metrics are removed and existing matching metrics are added
brokers.foreach(broker => broker.kafkaYammerMetrics.reconfigure(
Map(JmxReporter.EXCLUDE_CONFIG -> "kafka.controller:type=KafkaController,name=ActiveControllerCount").asJava
@ -140,9 +132,8 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
.isRegistered(new ObjectName(s"$requiredKafkaServerPrefix=ClusterId")))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testGeneralBrokerTopicMetricsAreGreedilyRegistered(quorum: String): Unit = {
@Test
def testGeneralBrokerTopicMetricsAreGreedilyRegistered(): Unit = {
val topic = "test-broker-topic-metric"
createTopic(topic, 2)
@ -156,9 +147,8 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assertTrue(topicMetricGroups(topic).nonEmpty, "Topic metrics aren't registered")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testWindowsStyleTagNames(quorum: String): Unit = {
@Test
def testWindowsStyleTagNames(): Unit = {
val path = "C:\\windows-path\\kafka-logs"
val tags = Map("dir" -> path)
val expectedMBeanName = Set(tags.keySet.head, ObjectName.quote(path)).mkString("=")
@ -213,9 +203,8 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assertTrue(TestUtils.meterCount(bytesOut) > initialBytesOut)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testKRaftControllerMetrics(quorum: String): Unit = {
@Test
def testKRaftControllerMetrics(): Unit = {
val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
Set(
"kafka.controller:type=KafkaController,name=ActiveControllerCount",

View File

@ -41,8 +41,6 @@ import org.apache.kafka.server.authorizer._
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.net.InetAddress
import java.util
@ -52,7 +50,6 @@ import scala.jdk.CollectionConverters._
class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
private final val PLAINTEXT = new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "127.0.0.1", 9020)
private final val KRAFT = "kraft"
private val allowReadAcl = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, READ, ALLOW)
private val allowWriteAcl = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, WRITE, ALLOW)
@ -104,32 +101,28 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
super.tearDown()
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testAuthorizeThrowsOnNonLiteralResource(quorum: String): Unit = {
@Test
def testAuthorizeThrowsOnNonLiteralResource(): Unit = {
assertThrows(classOf[IllegalArgumentException], () => authorize(authorizer1, requestContext, READ,
new ResourcePattern(TOPIC, "something", PREFIXED)))
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testAuthorizeWithEmptyResourceName(quorum: String): Unit = {
@Test
def testAuthorizeWithEmptyResourceName(): Unit = {
assertFalse(authorize(authorizer1, requestContext, READ, new ResourcePattern(GROUP, "", LITERAL)))
addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(GROUP, WILDCARD_RESOURCE, LITERAL))
assertTrue(authorize(authorizer1, requestContext, READ, new ResourcePattern(GROUP, "", LITERAL)))
}
// Authorizing the empty resource is not supported because empty resource name is invalid.
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testEmptyAclThrowsException(quorum: String): Unit = {
@Test
def testEmptyAclThrowsException(): Unit = {
assertThrows(classOf[ApiException],
() => addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(GROUP, "", LITERAL)))
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testTopicAcl(quorum: String): Unit = {
@Test
def testTopicAcl(): Unit = {
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob")
val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "batman")
@ -183,9 +176,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
/**
* CustomPrincipals should be compared with their principal type and name
*/
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testAllowAccessWithCustomPrincipal(quorum: String): Unit = {
@Test
def testAllowAccessWithCustomPrincipal(): Unit = {
val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val customUserPrincipal = new CustomPrincipal(KafkaPrincipal.USER_TYPE, username)
val host1 = InetAddress.getByName("192.168.1.1")
@ -204,9 +196,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
assertFalse(authorize(authorizer1, host1Context, READ, resource), "User1 should not have READ access from host1 due to denyAcl")
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testDenyTakesPrecedence(quorum: String): Unit = {
@Test
def testDenyTakesPrecedence(): Unit = {
val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val host = InetAddress.getByName("192.168.2.1")
val session = newRequestContext(user, host)
@ -220,9 +211,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
assertFalse(authorize(authorizer1, session, READ, resource), "deny should take precedence over allow.")
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testAllowAllAccess(quorum: String): Unit = {
@Test
def testAllowAllAccess(): Unit = {
val allowAllAcl = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL, ALLOW)
changeAclAndVerify(Set.empty, Set(allowAllAcl), Set.empty)
@ -231,9 +221,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
assertTrue(authorize(authorizer1, context, READ, resource), "allow all acl should allow access to all.")
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testSuperUserHasAccess(quorum: String): Unit = {
@Test
def testSuperUserHasAccess(): Unit = {
val denyAllAcl = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL, DENY)
changeAclAndVerify(Set.empty, Set(denyAllAcl), Set.empty)
@ -248,9 +237,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
/**
* CustomPrincipals should be compared with their principal type and name
*/
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testSuperUserWithCustomPrincipalHasAccess(quorum: String): Unit = {
@Test
def testSuperUserWithCustomPrincipalHasAccess(): Unit = {
val denyAllAcl = new AccessControlEntry(WILDCARD_PRINCIPAL_STRING, WILDCARD_HOST, AclOperation.ALL, DENY)
changeAclAndVerify(Set.empty, Set(denyAllAcl), Set.empty)
@ -259,9 +247,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
assertTrue(authorize(authorizer1, session, READ, resource), "superuser with custom principal always has access, no matter what acls.")
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testWildCardAcls(quorum: String): Unit = {
@Test
def testWildCardAcls(): Unit = {
assertFalse(authorize(authorizer1, requestContext, READ, resource), "when acls = [], authorizer should fail close.")
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
@ -284,15 +271,13 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
assertFalse(authorize(authorizer1, host1Context, WRITE, resource), "User1 should not have WRITE access from host1")
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testNoAclFound(quorum: String): Unit = {
@Test
def testNoAclFound(): Unit = {
assertFalse(authorize(authorizer1, requestContext, READ, resource), "when acls = [], authorizer should deny op.")
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testNoAclFoundOverride(quorum: String): Unit = {
@Test
def testNoAclFoundOverride(): Unit = {
val props = properties
props.put(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true")
@ -307,9 +292,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testAclConfigWithWhitespace(quorum: String): Unit = {
@Test
def testAclConfigWithWhitespace(): Unit = {
val props = properties
props.put(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, " true")
// replace all property values with leading & trailing whitespaces
@ -325,9 +309,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testAclManagementAPIs(quorum: String): Unit = {
@Test
def testAclManagementAPIs(): Unit = {
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
val host1 = "host1"
@ -393,9 +376,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
/**
* Test ACL inheritance, as described in #{org.apache.kafka.common.acl.AclOperation}
*/
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testAclInheritance(quorum: String): Unit = {
@Test
def testAclInheritance(): Unit = {
testImplicationsOfAllow(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE,
CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, CREATE_TOKENS, DESCRIBE_TOKENS, TWO_PHASE_COMMIT))
testImplicationsOfDeny(AclOperation.ALL, Set(READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE,
@ -442,17 +424,15 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
removeAcls(authorizer1, acls, clusterResource)
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testAccessAllowedIfAllowAclExistsOnWildcardResource(quorum: String): Unit = {
@Test
def testAccessAllowedIfAllowAclExistsOnWildcardResource(): Unit = {
addAcls(authorizer1, Set(allowReadAcl), wildCardResource)
assertTrue(authorize(authorizer1, requestContext, READ, resource))
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testDeleteAclOnWildcardResource(quorum: String): Unit = {
@Test
def testDeleteAclOnWildcardResource(): Unit = {
addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), wildCardResource)
removeAcls(authorizer1, Set(allowReadAcl), wildCardResource)
@ -460,9 +440,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
assertEquals(Set(allowWriteAcl), getAcls(authorizer1, wildCardResource))
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testDeleteAllAclOnWildcardResource(quorum: String): Unit = {
@Test
def testDeleteAllAclOnWildcardResource(): Unit = {
addAcls(authorizer1, Set(allowReadAcl), wildCardResource)
removeAcls(authorizer1, Set.empty, wildCardResource)
@ -470,17 +449,15 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
assertEquals(Set.empty, getAcls(authorizer1))
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testAccessAllowedIfAllowAclExistsOnPrefixedResource(quorum: String): Unit = {
@Test
def testAccessAllowedIfAllowAclExistsOnPrefixedResource(): Unit = {
addAcls(authorizer1, Set(allowReadAcl), prefixedResource)
assertTrue(authorize(authorizer1, requestContext, READ, resource))
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testDeleteAclOnPrefixedResource(quorum: String): Unit = {
@Test
def testDeleteAclOnPrefixedResource(): Unit = {
addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource)
removeAcls(authorizer1, Set(allowReadAcl), prefixedResource)
@ -488,9 +465,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
assertEquals(Set(allowWriteAcl), getAcls(authorizer1, prefixedResource))
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testDeleteAllAclOnPrefixedResource(quorum: String): Unit = {
@Test
def testDeleteAllAclOnPrefixedResource(): Unit = {
addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource)
removeAcls(authorizer1, Set.empty, prefixedResource)
@ -498,9 +474,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
assertEquals(Set.empty, getAcls(authorizer1))
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testAddAclsOnLiteralResource(quorum: String): Unit = {
@Test
def testAddAclsOnLiteralResource(): Unit = {
addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), resource)
addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), resource)
@ -509,9 +484,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
assertEquals(Set.empty, getAcls(authorizer1, prefixedResource))
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testAddAclsOnWildcardResource(quorum: String): Unit = {
@Test
def testAddAclsOnWildcardResource(): Unit = {
addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), wildCardResource)
addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), wildCardResource)
@ -520,9 +494,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
assertEquals(Set.empty, getAcls(authorizer1, prefixedResource))
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testAddAclsOnPrefixedResource(quorum: String): Unit = {
@Test
def testAddAclsOnPrefixedResource(): Unit = {
addAcls(authorizer1, Set(allowReadAcl, allowWriteAcl), prefixedResource)
addAcls(authorizer1, Set(allowWriteAcl, denyReadAcl), prefixedResource)
@ -531,9 +504,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
assertEquals(Set.empty, getAcls(authorizer1, resource))
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testAuthorizeWithPrefixedResource(quorum: String): Unit = {
@Test
def testAuthorizeWithPrefixedResource(): Unit = {
addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "a_other", LITERAL))
addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "a_other", PREFIXED))
addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), PREFIXED))
@ -552,9 +524,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
assertTrue(authorize(authorizer1, requestContext, READ, resource))
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testSingleCharacterResourceAcls(quorum: String): Unit = {
@Test
def testSingleCharacterResourceAcls(): Unit = {
addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(TOPIC, "f", LITERAL))
assertTrue(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "f", LITERAL)))
assertFalse(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "foo", LITERAL)))
@ -565,9 +536,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
assertFalse(authorize(authorizer1, requestContext, READ, new ResourcePattern(TOPIC, "foo_", LITERAL)))
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testGetAclsPrincipal(quorum: String): Unit = {
@Test
def testGetAclsPrincipal(): Unit = {
val aclOnSpecificPrincipal = new AccessControlEntry(principal.toString, WILDCARD_HOST, WRITE, ALLOW)
addAcls(authorizer1, Set(aclOnSpecificPrincipal), resource)
@ -586,9 +556,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
assertEquals(0, getAcls(authorizer1, principal).size, "acl on wildcard should not be returned for specific request")
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testAclsFilter(quorum: String): Unit = {
@Test
def testAclsFilter(): Unit = {
val resource1 = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
val resource2 = new ResourcePattern(TOPIC, "bar-" + UUID.randomUUID(), LITERAL)
val prefixedResource = new ResourcePattern(TOPIC, "bar-", PREFIXED)
@ -622,9 +591,8 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
assertEquals(Set.empty, deleteResults(3).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
}
@ParameterizedTest
@ValueSource(strings = Array(KRAFT))
def testAuthorizeByResourceTypeNoAclFoundOverride(quorum: String): Unit = {
@Test
def testAuthorizeByResourceTypeNoAclFoundOverride(): Unit = {
val props = properties
props.put(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true")

View File

@ -32,9 +32,9 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, FindCoordinatorRequest, FindCoordinatorResponse, InitProducerIdRequest, InitProducerIdResponse}
import org.apache.kafka.server.config.ServerLogConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource, ValueSource}
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
@ -55,7 +55,7 @@ class AddPartitionsToTxnRequestServerTest extends BaseRequestTest {
@ParameterizedTest
@MethodSource(value = Array("parameters"))
def shouldReceiveOperationNotAttemptedWhenOtherPartitionHasError(quorum: String, version: Short): Unit = {
def shouldReceiveOperationNotAttemptedWhenOtherPartitionHasError(version: Short): Unit = {
// The basic idea is that we have one unknown topic and one created topic. We should get the 'UNKNOWN_TOPIC_OR_PARTITION'
// error for the unknown topic and the 'OPERATION_NOT_ATTEMPTED' error for the known and authorized topic.
val nonExistentTopic = new TopicPartition("unknownTopic", 0)
@ -110,9 +110,8 @@ class AddPartitionsToTxnRequestServerTest extends BaseRequestTest {
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, errors.get(nonExistentTopic))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testOneSuccessOneErrorInBatchedRequest(quorum: String): Unit = {
@Test
def testOneSuccessOneErrorInBatchedRequest(): Unit = {
val tp0 = new TopicPartition(topic1, 0)
val transactionalId1 = "foobar"
val transactionalId2 = "barfoo" // "barfoo" maps to the same transaction coordinator
@ -149,9 +148,8 @@ class AddPartitionsToTxnRequestServerTest extends BaseRequestTest {
assertEquals(expectedErrors, errors)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testVerifyOnly(quorum: String): Unit = {
@Test
def testVerifyOnly(): Unit = {
val tp0 = new TopicPartition(topic1, 0)
val transactionalId = "foobar"
@ -209,7 +207,7 @@ object AddPartitionsToTxnRequestServerTest {
def parameters: JStream[Arguments] = {
val arguments = mutable.ListBuffer[Arguments]()
ApiKeys.ADD_PARTITIONS_TO_TXN.allVersions().forEach { version =>
arguments += Arguments.of("kraft", version)
arguments += Arguments.of(version)
}
arguments.asJava.stream()
}

View File

@ -27,8 +27,7 @@ import org.apache.kafka.common.requests.{AlterReplicaLogDirsRequest, AlterReplic
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.storage.internals.log.LogFileUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.Test
import java.util.Properties
import scala.jdk.CollectionConverters._
@ -52,9 +51,8 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
.find(p => p.partitionIndex == tp.partition).get.errorCode)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterReplicaLogDirsRequest(quorum: String): Unit = {
@Test
def testAlterReplicaLogDirsRequest(): Unit = {
val partitionNum = 5
// Alter replica dir before topic creation
@ -88,9 +86,8 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterReplicaLogDirsRequestErrorCode(quorum: String): Unit = {
@Test
def testAlterReplicaLogDirsRequestErrorCode(): Unit = {
val offlineDir = new File(brokers.head.config.logDirs.tail.head).getAbsolutePath
val validDir1 = new File(brokers.head.config.logDirs(1)).getAbsolutePath
val validDir2 = new File(brokers.head.config.logDirs(2)).getAbsolutePath
@ -127,9 +124,8 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterReplicaLogDirsRequestWithRetention(quorum: String): Unit = {
@Test
def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
val partitionNum = 1
// Alter replica dir before topic creation

View File

@ -25,9 +25,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse}
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.TestInfo
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{Test, TestInfo}
import java.util
import java.util.Properties
@ -54,9 +52,8 @@ class AlterUserScramCredentialsRequestNotAuthorizedTest extends BaseRequestTest
private val user1 = "user1"
private val user2 = "user2"
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterNothingNotAuthorized(quorum: String): Unit = {
@Test
def testAlterNothingNotAuthorized(): Unit = {
val request = new AlterUserScramCredentialsRequest.Builder(
new AlterUserScramCredentialsRequestData()
.setDeletions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion])
@ -67,9 +64,8 @@ class AlterUserScramCredentialsRequestNotAuthorizedTest extends BaseRequestTest
assertEquals(0, results.size)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterSomethingNotAuthorized(quorum: String): Unit = {
@Test
def testAlterSomethingNotAuthorized(): Unit = {
val request = new AlterUserScramCredentialsRequest.Builder(
new AlterUserScramCredentialsRequestData()
.setDeletions(util.Arrays.asList(new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)))

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuild
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@ -79,9 +79,8 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
private val user3 = "user3@user3.com"
private val unknownUser = "unknownUser"
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterNothing(quorum: String): Unit = {
@Test
def testAlterNothing(): Unit = {
val request = new AlterUserScramCredentialsRequest.Builder(
new AlterUserScramCredentialsRequestData()
.setDeletions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion])
@ -92,9 +91,8 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
assertEquals(0, results.size)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterSameThingTwice(quorum: String): Unit = {
@Test
def testAlterSameThingTwice(): Unit = {
val deletion1 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
val deletion2 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user2).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
val upsertion1 = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
@ -133,9 +131,8 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
})
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterEmptyUser(quorum: String): Unit = {
@Test
def testAlterEmptyUser(): Unit = {
val deletionEmpty = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("").setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
val upsertionEmpty = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("").setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
.setIterations(4096).setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
@ -162,9 +159,8 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
})
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterUnknownMechanism(quorum: String): Unit = {
@Test
def testAlterUnknownMechanism(): Unit = {
val deletionUnknown1 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.UNKNOWN.`type`)
val deletionValid1 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
val deletionUnknown2 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user2).setMechanism(10.toByte)
@ -190,9 +186,8 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
results.asScala.foreach(result => assertEquals("Unknown SCRAM mechanism", result.errorMessage))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterTooFewIterations(quorum: String): Unit = {
@Test
def testAlterTooFewIterations(): Unit = {
val upsertionTooFewIterations = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user1)
.setMechanism(ScramMechanism.SCRAM_SHA_256.`type`).setIterations(1)
.setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
@ -207,9 +202,8 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
assertEquals("Too few iterations", results.get(0).errorMessage)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterTooManyIterations(quorum: String): Unit = {
@Test
def testAlterTooManyIterations(): Unit = {
val upsertionTooFewIterations = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user1)
.setMechanism(ScramMechanism.SCRAM_SHA_256.`type`).setIterations(Integer.MAX_VALUE)
.setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
@ -224,9 +218,8 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
assertEquals("Too many iterations", results.get(0).errorMessage)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDeleteSomethingThatDoesNotExist(quorum: String): Unit = {
@Test
def testDeleteSomethingThatDoesNotExist(): Unit = {
val request = new AlterUserScramCredentialsRequest.Builder(
new AlterUserScramCredentialsRequestData()
.setDeletions(util.Arrays.asList(new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)))
@ -238,9 +231,8 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
checkAllErrorsAlteringCredentials(results, Errors.RESOURCE_NOT_FOUND, "when deleting a non-existing credential")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAlterAndDescribe(quorum: String): Unit = {
@Test
def testAlterAndDescribe(): Unit = {
// create a bunch of credentials
val request1_0 = new AlterUserScramCredentialsRequest.Builder(
new AlterUserScramCredentialsRequestData()

View File

@ -49,9 +49,7 @@ import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Assertions.fail
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import scala.collection.Seq
import scala.jdk.CollectionConverters._
@ -126,9 +124,8 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
waitUserQuota(ThrottledPrincipal.getName, ControllerMutationRate)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testSetUnsetQuota(quorum: String): Unit = {
@Test
def testSetUnsetQuota(): Unit = {
val rate = 1.5
val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "User")
// Default Value
@ -143,9 +140,8 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
waitUserQuota(principal.getName, Long.MaxValue)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testQuotaMetric(quorum: String): Unit = {
@Test
def testQuotaMetric(): Unit = {
asPrincipal(ThrottledPrincipal) {
// Metric is lazily created
assertTrue(quotaMetric(principal.getName).isEmpty)
@ -166,9 +162,8 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testStrictCreateTopicsRequest(quorum: String): Unit = {
@Test
def testStrictCreateTopicsRequest(): Unit = {
asPrincipal(ThrottledPrincipal) {
// Create two topics worth of 30 partitions each. As we use a strict quota, we
// expect one to be created and one to be rejected.
@ -190,9 +185,8 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testPermissiveCreateTopicsRequest(quorum: String): Unit = {
@Test
def testPermissiveCreateTopicsRequest(): Unit = {
asPrincipal(ThrottledPrincipal) {
// Create two topics worth of 30 partitions each. As we use a permissive quota, we
// expect both topics to be created.
@ -204,9 +198,8 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUnboundedCreateTopicsRequest(quorum: String): Unit = {
@Test
def testUnboundedCreateTopicsRequest(): Unit = {
asPrincipal(UnboundedPrincipal) {
// Create two topics worth of 30 partitions each. As we use an user without quota, we
// expect both topics to be created. The throttle time should be equal to 0.
@ -216,9 +209,8 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testStrictDeleteTopicsRequest(quorum: String): Unit = {
@Test
def testStrictDeleteTopicsRequest(): Unit = {
asPrincipal(UnboundedPrincipal) {
createTopics(TopicsWith30Partitions, StrictCreateTopicsRequestVersion)
}
@ -244,9 +236,8 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testPermissiveDeleteTopicsRequest(quorum: String): Unit = {
@Test
def testPermissiveDeleteTopicsRequest(): Unit = {
asPrincipal(UnboundedPrincipal) {
createTopics(TopicsWith30Partitions, StrictCreateTopicsRequestVersion)
}
@ -262,9 +253,8 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUnboundedDeleteTopicsRequest(quorum: String): Unit = {
@Test
def testUnboundedDeleteTopicsRequest(): Unit = {
asPrincipal(UnboundedPrincipal) {
createTopics(TopicsWith30Partitions, StrictCreateTopicsRequestVersion)
@ -276,9 +266,8 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testStrictCreatePartitionsRequest(quorum: String): Unit = {
@Test
def testStrictCreatePartitionsRequest(): Unit = {
asPrincipal(UnboundedPrincipal) {
createTopics(TopicsWithOnePartition, StrictCreatePartitionsRequestVersion)
}
@ -304,9 +293,8 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testPermissiveCreatePartitionsRequest(quorum: String): Unit = {
@Test
def testPermissiveCreatePartitionsRequest(): Unit = {
asPrincipal(UnboundedPrincipal) {
createTopics(TopicsWithOnePartition, StrictCreatePartitionsRequestVersion)
}
@ -322,9 +310,8 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUnboundedCreatePartitionsRequest(quorum: String): Unit = {
@Test
def testUnboundedCreatePartitionsRequest(): Unit = {
asPrincipal(UnboundedPrincipal) {
createTopics(TopicsWithOnePartition, StrictCreatePartitionsRequestVersion)

View File

@ -24,16 +24,14 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCol
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.CreateTopicsRequest
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.Test
import scala.jdk.CollectionConverters._
class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testValidCreateTopicsRequests(quorum: String): Unit = {
@Test
def testValidCreateTopicsRequests(): Unit = {
// Generated assignments
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic1"))))
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic2", replicationFactor = 3))))
@ -61,9 +59,8 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
topicReq("topic14", replicationFactor = -1, numPartitions = 2))))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testErrorCreateTopicsRequests(quorum: String): Unit = {
@Test
def testErrorCreateTopicsRequests(): Unit = {
val existingTopic = "existing-topic"
createTopic(existingTopic)
// Basic
@ -99,9 +96,8 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
validateTopicExists("partial-none")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testInvalidCreateTopicsRequests(quorum: String): Unit = {
@Test
def testInvalidCreateTopicsRequests(): Unit = {
// Partitions/ReplicationFactor and ReplicaAssignment
validateErrorCreateTopicsRequests(topicsReq(Seq(
topicReq("bad-args-topic", numPartitions = 10, replicationFactor = 3,
@ -114,9 +110,8 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
Map("bad-args-topic" -> error(Errors.INVALID_REQUEST)), checkErrorMessage = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateTopicsRequestVersions(quorum: String): Unit = {
@Test
def testCreateTopicsRequestVersions(): Unit = {
for (version <- ApiKeys.CREATE_TOPICS.oldestVersion to ApiKeys.CREATE_TOPICS.latestVersion) {
val topic = s"topic_$version"
val data = new CreateTopicsRequestData()
@ -153,9 +148,8 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateClusterMetadataTopic(quorum: String): Unit = {
@Test
def testCreateClusterMetadataTopic(): Unit = {
validateErrorCreateTopicsRequests(
topicsReq(Seq(topicReq(Topic.CLUSTER_METADATA_TOPIC_NAME))),
Map(Topic.CLUSTER_METADATA_TOPIC_NAME ->

View File

@ -26,9 +26,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.server.config.ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG
import org.apache.kafka.server.policy.CreateTopicPolicy
import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
import org.junit.jupiter.api.TestInfo
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{Test, TestInfo}
import scala.jdk.CollectionConverters._
@ -46,9 +44,8 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest
Seq(properties)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testValidCreateTopicsRequests(quorum: String): Unit = {
@Test
def testValidCreateTopicsRequests(): Unit = {
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic1",
numPartitions = 5))))
@ -65,9 +62,8 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest
assignment = Map(0 -> List(1, 0), 1 -> List(0, 1))))))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testErrorCreateTopicsRequests(quorum: String): Unit = {
@Test
def testErrorCreateTopicsRequests(): Unit = {
val existingTopic = "existing-topic"
createTopic(existingTopic, 5)

View File

@ -21,10 +21,8 @@ import kafka.security.JaasTestUtils
import java.util
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.errors.UnsupportedByAuthenticationException
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.concurrent.ExecutionException
import scala.jdk.javaapi.OptionConverters
@ -48,9 +46,8 @@ class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
config
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDelegationTokenRequests(quorum: String): Unit = {
@Test
def testDelegationTokenRequests(): Unit = {
adminClient = Admin.create(createAdminConfig)
val createResult = adminClient.createDelegationToken()

View File

@ -25,9 +25,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.SecurityUtils
import org.apache.kafka.server.config.DelegationTokenManagerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import java.util
import scala.concurrent.ExecutionException
@ -65,9 +63,8 @@ class DelegationTokenRequestsTest extends IntegrationTestHarness with SaslSetup
config
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDelegationTokenRequests(quorum: String): Unit = {
@Test
def testDelegationTokenRequests(): Unit = {
adminClient = Admin.create(createAdminConfig)
// create token1 with renewer1

View File

@ -22,9 +22,7 @@ import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.errors.DelegationTokenDisabledException
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import java.util
import scala.concurrent.ExecutionException
@ -55,9 +53,8 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest
config
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDelegationTokenRequests(quorum: String): Unit = {
@Test
def testDelegationTokenRequests(): Unit = {
adminClient = Admin.create(createAdminConfig)
val createResult = adminClient.createDelegationToken()

View File

@ -25,8 +25,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{DeleteRecordsRequest, DeleteRecordsResponse}
import org.apache.kafka.common.serialization.StringSerializer
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.Test
import java.util.Collections
import java.util.concurrent.TimeUnit
@ -36,9 +35,8 @@ class DeleteRecordsRequestTest extends BaseRequestTest {
private val TIMEOUT_MS = 1000
private val MESSAGES_PRODUCED_PER_PARTITION = 10
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDeleteRecordsHappyCase(quorum: String): Unit = {
@Test
def testDeleteRecordsHappyCase(): Unit = {
val (topicPartition: TopicPartition, leaderId: Int) = createTopicAndSendRecords
// Create the DeleteRecord request requesting deletion of offset which is not present
@ -61,9 +59,8 @@ class DeleteRecordsRequestTest extends BaseRequestTest {
validateLogStartOffsetForTopic(topicPartition, offsetToDelete)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testErrorWhenDeletingRecordsWithInvalidOffset(quorum: String): Unit = {
@Test
def testErrorWhenDeletingRecordsWithInvalidOffset(): Unit = {
val (topicPartition: TopicPartition, leaderId: Int) = createTopicAndSendRecords
// Create the DeleteRecord request requesting deletion of offset which is not present
@ -86,9 +83,8 @@ class DeleteRecordsRequestTest extends BaseRequestTest {
validateLogStartOffsetForTopic(topicPartition, 0)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testErrorWhenDeletingRecordsWithInvalidTopic(quorum: String): Unit = {
@Test
def testErrorWhenDeletingRecordsWithInvalidTopic(): Unit = {
val invalidTopicPartition = new TopicPartition("invalid-topic", 0)
// Create the DeleteRecord request requesting deletion of offset which is not present
val offsetToDelete = 1

View File

@ -28,17 +28,15 @@ import org.apache.kafka.common.requests.DeleteTopicsResponse
import org.apache.kafka.common.requests.MetadataRequest
import org.apache.kafka.common.requests.MetadataResponse
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.Test
import scala.collection.Seq
import scala.jdk.CollectionConverters._
class DeleteTopicsRequestTest extends BaseRequestTest with Logging {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testTopicDeletionClusterHasOfflinePartitions(quorum: String): Unit = {
@Test
def testTopicDeletionClusterHasOfflinePartitions(): Unit = {
// Create two topics with one partition/replica. Make one of them offline.
val offlineTopic = "topic-1"
val onlineTopic = "topic-2"
@ -70,9 +68,8 @@ class DeleteTopicsRequestTest extends BaseRequestTest with Logging {
"The topics are found in the Broker's cache")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testValidDeleteTopicRequests(quorum: String): Unit = {
@Test
def testValidDeleteTopicRequests(): Unit = {
val timeout = 10000
// Single topic
createTopic("topic-1")
@ -138,9 +135,8 @@ class DeleteTopicsRequestTest extends BaseRequestTest with Logging {
connectAndReceive[DeleteTopicsResponse](request, destination = socketServer)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDeleteTopicsVersions(quorum: String): Unit = {
@Test
def testDeleteTopicsVersions(): Unit = {
val timeout = 10000
for (version <- ApiKeys.DELETE_TOPICS.oldestVersion to ApiKeys.DELETE_TOPICS.latestVersion) {
info(s"Creating and deleting tests for version $version")

View File

@ -25,9 +25,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{DeleteTopicsRequest, DeleteTopicsResponse}
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.TestInfo
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{Test, TestInfo}
class DeleteTopicsRequestWithDeletionDisabledTest extends BaseRequestTest {
@ -48,9 +46,8 @@ class DeleteTopicsRequestWithDeletionDisabledTest extends BaseRequestTest {
props.map(KafkaConfig.fromProps)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDeleteRecordsRequest(quorum: String): Unit = {
@Test
def testDeleteRecordsRequest(): Unit = {
val topic = "topic-1"
val request = new DeleteTopicsRequest.Builder(
new DeleteTopicsRequestData()

View File

@ -27,9 +27,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import java.lang.{Byte => JByte}
import java.util.Properties
@ -48,15 +46,13 @@ class DescribeClusterRequestTest extends BaseRequestTest {
doSetup(testInfo, createOffsetsTopic = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDescribeClusterRequestIncludingClusterAuthorizedOperations(quorum: String): Unit = {
@Test
def testDescribeClusterRequestIncludingClusterAuthorizedOperations(): Unit = {
testDescribeClusterRequest(true)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDescribeClusterRequestExcludingClusterAuthorizedOperations(quorum: String): Unit = {
@Test
def testDescribeClusterRequestExcludingClusterAuthorizedOperations(): Unit = {
testDescribeClusterRequest(false)
}

View File

@ -25,8 +25,7 @@ import org.apache.kafka.common.message.DescribeLogDirsRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests._
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.Test
import scala.jdk.CollectionConverters._
@ -39,9 +38,8 @@ class DescribeLogDirsRequestTest extends BaseRequestTest {
val tp0 = new TopicPartition(topic, 0)
val tp1 = new TopicPartition(topic, 1)
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDescribeLogDirsRequest(quorum: String): Unit = {
@Test
def testDescribeLogDirsRequest(): Unit = {
val onlineDir = new File(brokers.head.config.logDirs.head).getAbsolutePath
val offlineDir = new File(brokers.head.config.logDirs.tail.head).getAbsolutePath
brokers.head.replicaManager.handleLogDirFailure(offlineDir)

View File

@ -24,8 +24,7 @@ import org.apache.kafka.common.requests.{DescribeUserScramCredentialsRequest, De
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.Test
import java.util.Properties
@ -39,9 +38,8 @@ class DescribeUserScramCredentialsRequestNotAuthorizedTest extends BaseRequestTe
properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[DescribeCredentialsTest.TestPrincipalBuilderReturningUnauthorized].getName)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDescribeNotAuthorized(quorum: String): Unit = {
@Test
def testDescribeNotAuthorized(): Unit = {
val request = new DescribeUserScramCredentialsRequest.Builder(
new DescribeUserScramCredentialsRequestData()).build()
val response = sendDescribeUserScramCredentialsRequest(request)

View File

@ -28,8 +28,6 @@ import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuild
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
@ -48,9 +46,8 @@ class DescribeUserScramCredentialsRequestTest extends BaseRequestTest {
super.setUp(testInfo)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDescribeNothing(quorum: String): Unit = {
@Test
def testDescribeNothing(): Unit = {
val request = new DescribeUserScramCredentialsRequest.Builder(
new DescribeUserScramCredentialsRequestData()).build()
val response = sendDescribeUserScramCredentialsRequest(request)
@ -60,9 +57,8 @@ class DescribeUserScramCredentialsRequestTest extends BaseRequestTest {
assertEquals(0, response.data.results.size, "Expected no credentials when describing everything and there are no credentials")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDescribeWithNull(quorum: String): Unit = {
@Test
def testDescribeWithNull(): Unit = {
val request = new DescribeUserScramCredentialsRequest.Builder(
new DescribeUserScramCredentialsRequestData().setUsers(null)).build()
val response = sendDescribeUserScramCredentialsRequest(request)
@ -82,9 +78,8 @@ class DescribeUserScramCredentialsRequestTest extends BaseRequestTest {
assertEquals(Errors.NONE.code, error, "Did not expect controller error when routed to non-controller")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDescribeSameUserTwice(quorum: String): Unit = {
@Test
def testDescribeSameUserTwice(): Unit = {
val user = "user1"
val userName = new UserName().setName(user)
val request = new DescribeUserScramCredentialsRequest.Builder(
@ -98,9 +93,8 @@ class DescribeUserScramCredentialsRequestTest extends BaseRequestTest {
assertEquals(s"Cannot describe SCRAM credentials for the same user twice in a single request: $user", result.errorMessage)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUnknownUser(quorum: String): Unit = {
@Test
def testUnknownUser(): Unit = {
val unknownUser = "unknownUser"
val request = new DescribeUserScramCredentialsRequest.Builder(
new DescribeUserScramCredentialsRequestData().setUsers(List(new UserName().setName(unknownUser)).asJava)).build()

View File

@ -40,8 +40,6 @@ import org.apache.kafka.storage.internals.log.{LogConfig, UnifiedLog}
import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
@ -61,9 +59,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
List(KafkaConfig.fromProps(cfg))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConfigChange(quorum: String): Unit = {
@Test
def testConfigChange(): Unit = {
val oldVal: java.lang.Long = 100000L
val newVal: java.lang.Long = 200000L
val tp = new TopicPartition("test", 0)
@ -95,9 +92,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDynamicTopicConfigChange(quorum: String): Unit = {
@Test
def testDynamicTopicConfigChange(): Unit = {
val tp = new TopicPartition("test", 0)
val oldSegmentSize = 1000
val logProps = new Properties()
@ -180,59 +176,52 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testClientIdQuotaConfigChange(quorum: String): Unit = {
@Test
def testClientIdQuotaConfigChange(): Unit = {
val m = new util.HashMap[String, String]
m.put(CLIENT_ID, "testClient")
testQuotaConfigChange(new ClientQuotaEntity(m), KafkaPrincipal.ANONYMOUS, "testClient")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUserQuotaConfigChange(quorum: String): Unit = {
@Test
def testUserQuotaConfigChange(): Unit = {
val m = new util.HashMap[String, String]
m.put(USER, "ANONYMOUS")
testQuotaConfigChange(new ClientQuotaEntity(m), KafkaPrincipal.ANONYMOUS, "testClient")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUserClientIdQuotaChange(quorum: String): Unit = {
@Test
def testUserClientIdQuotaChange(): Unit = {
val m = new util.HashMap[String, String]
m.put(USER, "ANONYMOUS")
m.put(CLIENT_ID, "testClient")
testQuotaConfigChange(new ClientQuotaEntity(m), KafkaPrincipal.ANONYMOUS, "testClient")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDefaultClientIdQuotaConfigChange(quorum: String): Unit = {
@Test
def testDefaultClientIdQuotaConfigChange(): Unit = {
val m = new util.HashMap[String, String]
m.put(CLIENT_ID, null)
testQuotaConfigChange(new ClientQuotaEntity(m), KafkaPrincipal.ANONYMOUS, "testClient")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDefaultUserQuotaConfigChange(quorum: String): Unit = {
@Test
def testDefaultUserQuotaConfigChange(): Unit = {
val m = new util.HashMap[String, String]
m.put(USER, null)
testQuotaConfigChange(new ClientQuotaEntity(m), KafkaPrincipal.ANONYMOUS, "testClient")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDefaultUserClientIdQuotaConfigChange(quorum: String): Unit = {
@Test
def testDefaultUserClientIdQuotaConfigChange(): Unit = {
val m = new util.HashMap[String, String]
m.put(USER, null)
m.put(CLIENT_ID, null)
testQuotaConfigChange(new ClientQuotaEntity(m), KafkaPrincipal.ANONYMOUS, "testClient")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testIpQuotaInitialization(quorum: String): Unit = {
@Test
def testIpQuotaInitialization(): Unit = {
val broker = brokers.head
val admin = createAdminClient()
try {
@ -252,9 +241,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testIpQuotaConfigChange(quorum: String): Unit = {
@Test
def testIpQuotaConfigChange(): Unit = {
val admin = createAdminClient()
try {
val alterations = util.Arrays.asList(
@ -296,9 +284,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
private def tempTopic() : String = "testTopic" + random.nextInt(1000000)
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConfigChangeOnNonExistingTopicWithAdminClient(quorum: String): Unit = {
@Test
def testConfigChangeOnNonExistingTopicWithAdminClient(): Unit = {
val topic = tempTopic()
val admin = createAdminClient()
try {
@ -314,9 +301,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testIncrementalAlterDefaultTopicConfig(quorum: String): Unit = {
@Test
def testIncrementalAlterDefaultTopicConfig(): Unit = {
val admin = createAdminClient()
try {
val resource = new ConfigResource(ConfigResource.Type.TOPIC, "")
@ -346,9 +332,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testBrokerIdConfigChangeAndDelete(quorum: String): Unit = {
@Test
def testBrokerIdConfigChangeAndDelete(): Unit = {
val newValue: Long = 100000L
val brokerId: String = this.brokers.head.config.brokerId.toString
setBrokerConfigs(brokerId, newValue)
@ -370,9 +355,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDefaultBrokerIdConfigChangeAndDelete(quorum: String): Unit = {
@Test
def testDefaultBrokerIdConfigChangeAndDelete(): Unit = {
val newValue: Long = 100000L
val brokerId: String = ""
setBrokerConfigs(brokerId, newValue)
@ -393,9 +377,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDefaultAndBrokerIdConfigChange(quorum: String): Unit = {
@Test
def testDefaultAndBrokerIdConfigChange(): Unit = {
val newValue: Long = 100000L
val brokerId: String = this.brokers.head.config.brokerId.toString
setBrokerConfigs(brokerId, newValue)
@ -411,9 +394,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDynamicGroupConfigChange(quorum: String): Unit = {
@Test
def testDynamicGroupConfigChange(): Unit = {
val newSessionTimeoutMs = 50000
val consumerGroupId = "group-foo"
val admin = createAdminClient()
@ -438,9 +420,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
assertEquals(newSessionTimeoutMs, groupConfig.consumerSessionTimeoutMs())
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip848"))
def testDynamicShareGroupConfigChange(quorum: String): Unit = {
@Test
def testDynamicShareGroupConfigChange(): Unit = {
val newRecordLockDurationMs = 50000
val shareGroupId = "group-foo"
val admin = createAdminClient()
@ -465,9 +446,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
assertEquals(newRecordLockDurationMs, groupConfig.shareRecordLockDurationMs)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testIncrementalAlterDefaultGroupConfig(quorum: String): Unit = {
@Test
def testIncrementalAlterDefaultGroupConfig(): Unit = {
val admin = createAdminClient()
try {
val resource = new ConfigResource(ConfigResource.Type.GROUP, "")

View File

@ -36,8 +36,7 @@ import org.apache.kafka.common.utils.ByteUtils
import org.apache.kafka.common.{TopicPartition, Uuid, requests}
import org.apache.kafka.server.config.ServerLogConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.Test
import scala.jdk.CollectionConverters._
@ -118,9 +117,8 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testProduceRequestWithNullClientId(quorum: String): Unit = {
@Test
def testProduceRequestWithNullClientId(): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition(topic, 0)
val correlationId = -1
@ -166,27 +164,23 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
assertEquals(Errors.NONE, Errors.forCode(partitionProduceResponse.errorCode), "There should be no error")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testHeaderOnlyRequest(quorum: String): Unit = {
@Test
def testHeaderOnlyRequest(): Unit = {
verifyDisconnect(requestHeaderBytes(ApiKeys.PRODUCE.id, 1))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testInvalidApiKeyRequest(quorum: String): Unit = {
@Test
def testInvalidApiKeyRequest(): Unit = {
verifyDisconnect(requestHeaderBytes(-1, 0))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testInvalidApiVersionRequest(quorum: String): Unit = {
@Test
def testInvalidApiVersionRequest(): Unit = {
verifyDisconnect(requestHeaderBytes(ApiKeys.PRODUCE.id, -1))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testMalformedHeaderRequest(quorum: String): Unit = {
@Test
def testMalformedHeaderRequest(): Unit = {
val serializedBytes = {
// Only send apiKey and apiVersion
val buffer = ByteBuffer.allocate(

View File

@ -25,9 +25,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import java.util.{Optional, Properties}
import scala.jdk.CollectionConverters._
@ -104,9 +102,8 @@ class FetchRequestMaxBytesTest extends BaseRequestTest {
* Note that when a single batch is larger than FetchMaxBytes, it will be
* returned in full even if this is larger than FetchMaxBytes. See KIP-74.
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumeMultipleRecords(quorum: String): Unit = {
@Test
def testConsumeMultipleRecords(): Unit = {
createTopics()
expectNextRecords(IndexedSeq(messages(0), messages(1)), 0)

View File

@ -26,8 +26,7 @@ import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.{IsolationLevel, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.record.BrokerCompressionType
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.Test
import java.util
import java.util.Optional
@ -41,9 +40,8 @@ import scala.util.Random
*/
class FetchRequestTest extends BaseFetchRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testBrokerRespectsPartitionsOrderAndSizeLimits(quorum: String): Unit = {
@Test
def testBrokerRespectsPartitionsOrderAndSizeLimits(): Unit = {
initProducer()
val messagesPerPartition = 9
@ -144,9 +142,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
evaluateResponse4(fetchResponse4V12, 12)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testFetchRequestV4WithReadCommitted(quorum: String): Unit = {
@Test
def testFetchRequestV4WithReadCommitted(): Unit = {
initProducer()
val maxPartitionBytes = 200
val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1).head
@ -163,9 +160,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertTrue(records(partitionData).map(_.sizeInBytes).sum > 0)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testFetchRequestToNonReplica(quorum: String): Unit = {
@Test
def testFetchRequestToNonReplica(): Unit = {
val topic = "topic"
val partition = 0
val topicPartition = new TopicPartition(topic, partition)
@ -194,15 +190,13 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, oldPartitionData.errorCode)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testLastFetchedEpochValidation(quorum: String): Unit = {
@Test
def testLastFetchedEpochValidation(): Unit = {
checkLastFetchedEpochValidation(ApiKeys.FETCH.latestVersion())
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testLastFetchedEpochValidationV12(quorum: String): Unit = {
@Test
def testLastFetchedEpochValidationV12(): Unit = {
checkLastFetchedEpochValidation(12)
}
@ -249,15 +243,13 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals(firstEpochEndOffset, divergingEpoch.endOffset)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCurrentEpochValidation(quorum: String): Unit = {
@Test
def testCurrentEpochValidation(): Unit = {
checkCurrentEpochValidation(ApiKeys.FETCH.latestVersion())
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCurrentEpochValidationV12(quorum: String): Unit = {
@Test
def testCurrentEpochValidationV12(): Unit = {
checkCurrentEpochValidation(12)
}
@ -299,15 +291,13 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch - 1))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testEpochValidationWithinFetchSession(quorum: String): Unit = {
@Test
def testEpochValidationWithinFetchSession(): Unit = {
checkEpochValidationWithinFetchSession(ApiKeys.FETCH.latestVersion())
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testEpochValidationWithinFetchSessionV12(quorum: String): Unit = {
@Test
def testEpochValidationWithinFetchSessionV12(): Unit = {
checkEpochValidationWithinFetchSession(12)
}
@ -367,9 +357,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
* those partitions are returned in all incremental fetch requests.
* This tests using FetchRequests that don't use topic IDs
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateIncrementalFetchWithPartitionsInErrorV12(quorum: String): Unit = {
@Test
def testCreateIncrementalFetchWithPartitionsInErrorV12(): Unit = {
def createConsumerFetchRequest(topicPartitions: Seq[TopicPartition],
metadata: JFetchMetadata,
toForget: Seq[TopicIdPartition]): FetchRequest =
@ -430,9 +419,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
/**
* Test that when a Fetch Request receives an unknown topic ID, it returns a top level error.
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testFetchWithPartitionsWithIdError(quorum: String): Unit = {
@Test
def testFetchWithPartitionsWithIdError(): Unit = {
def createConsumerFetchRequest(fetchData: util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData],
metadata: JFetchMetadata,
toForget: Seq[TopicIdPartition]): FetchRequest = {
@ -475,9 +463,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals(Errors.UNKNOWN_TOPIC_ID.code, responseData1.get(bar0).errorCode)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testZStdCompressedTopic(quorum: String): Unit = {
@Test
def testZStdCompressedTopic(): Unit = {
// ZSTD compressed topic
val topicConfig = Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> BrokerCompressionType.ZSTD.name)
val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1, configs = topicConfig).head
@ -523,9 +510,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals(3, records(data2).size)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testZStdCompressedRecords(quorum: String): Unit = {
@Test
def testZStdCompressedRecords(): Unit = {
// Producer compressed topic
val topicConfig = Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> BrokerCompressionType.PRODUCER.name)
val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1, configs = topicConfig).head

View File

@ -24,9 +24,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.server.metrics.MetricConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import java.net.Socket
import java.util.concurrent.atomic.AtomicInteger
@ -63,9 +61,8 @@ class KafkaMetricReporterExceptionHandlingTest extends BaseRequestTest {
super.tearDown()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testBothReportersAreInvoked(quorum: String): Unit = {
@Test
def testBothReportersAreInvoked(): Unit = {
val port = anySocketServer.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
val socket = new Socket("localhost", port)
socket.setSoTimeout(10000)

View File

@ -23,10 +23,8 @@ import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext, MetricsRepo
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
object KafkaMetricsReporterTest {
@ -78,9 +76,8 @@ class KafkaMetricsReporterTest extends QuorumTestHarness {
broker.startup()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testMetricsContextNamespacePresent(quorum: String): Unit = {
@Test
def testMetricsContextNamespacePresent(): Unit = {
assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID.get())
assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get())
assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get())

View File

@ -24,8 +24,7 @@ import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.Test
import java.util.{Optional, Properties}
import scala.collection.Seq
@ -43,9 +42,8 @@ class ListOffsetsRequestTest extends BaseRequestTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testListOffsetsErrorCodes(quorum: String): Unit = {
@Test
def testListOffsetsErrorCodes(): Unit = {
val targetTimes = List(new ListOffsetsTopic()
.setName(topic)
.setPartitions(List(new ListOffsetsPartition()
@ -108,9 +106,8 @@ class ListOffsetsRequestTest extends BaseRequestTest {
assertResponseError(error, brokerId, request)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCurrentEpochValidation(quorum: String): Unit = {
@Test
def testCurrentEpochValidation(): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition(topic, 0)
val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 3)
@ -168,9 +165,8 @@ class ListOffsetsRequestTest extends BaseRequestTest {
(partitionData.offset, partitionData.leaderEpoch, partitionData.errorCode())
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testResponseIncludesLeaderEpoch(quorum: String): Unit = {
@Test
def testResponseIncludesLeaderEpoch(): Unit = {
val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 3)
val firstLeaderId = partitionToLeader(partition.partition)
@ -209,9 +205,8 @@ class ListOffsetsRequestTest extends BaseRequestTest {
assertEquals((9L, firstLeaderEpoch, Errors.NONE.code), fetchOffsetAndEpochWithError(secondLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testResponseDefaultOffsetAndLeaderEpochForAllVersions(quorum: String): Unit = {
@Test
def testResponseDefaultOffsetAndLeaderEpochForAllVersions(): Unit = {
val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 3)
val firstLeaderId = partitionToLeader(partition.partition)

View File

@ -26,9 +26,7 @@ import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffset
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.storage.internals.log.{LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{Test, Timeout}
import java.io.File
import java.util.{Optional, Properties, Random}
@ -47,9 +45,8 @@ class LogOffsetTest extends BaseRequestTest {
props.put("log.retention.check.interval.ms", (5 * 1000 * 60).toString)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testGetOffsetsForUnknownTopic(quorum: String): Unit = {
@Test
def testGetOffsetsForUnknownTopic(): Unit = {
val topicPartition = new TopicPartition("foo", 0)
val request = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP).asJava).build(1)
@ -58,9 +55,8 @@ class LogOffsetTest extends BaseRequestTest {
}
@deprecated("ListOffsetsRequest V0", since = "")
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testGetOffsetsAfterDeleteRecords(quorum: String): Unit = {
@Test
def testGetOffsetsAfterDeleteRecords(): Unit = {
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, 0)
val log = createTopicAndGetLog(topic, topicPartition)
@ -84,9 +80,8 @@ class LogOffsetTest extends BaseRequestTest {
assertEquals(20L, consumerOffset)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(quorum: String): Unit = {
@Test
def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, 0)
val log = createTopicAndGetLog(topic, topicPartition)
@ -106,9 +101,8 @@ class LogOffsetTest extends BaseRequestTest {
assertEquals(Optional.empty, log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP, Optional.empty).timestampAndOffsetOpt)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(quorum: String): Unit = {
@Test
def testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(): Unit = {
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, 0)
val log = createTopicAndGetLog(topic, topicPartition)
@ -125,9 +119,8 @@ class LogOffsetTest extends BaseRequestTest {
assertEquals(6L, maxTimestampOffset.get.timestamp)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testGetOffsetsBeforeLatestTime(quorum: String): Unit = {
@Test
def testGetOffsetsBeforeLatestTime(): Unit = {
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, 0)
val log = createTopicAndGetLog(topic, topicPartition)
@ -158,9 +151,8 @@ class LogOffsetTest extends BaseRequestTest {
assertFalse(FetchResponse.recordsOrFail(fetchResponse.responseData(topicNames, ApiKeys.FETCH.latestVersion).get(topicPartition)).batches.iterator.hasNext)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testEmptyLogsGetOffsets(quorum: String): Unit = {
@Test
def testEmptyLogsGetOffsets(): Unit = {
val random = new Random
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, random.nextInt(10))
@ -182,9 +174,8 @@ class LogOffsetTest extends BaseRequestTest {
assertFalse(offsetChanged)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(quorum: String): Unit = {
@Test
def testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(): Unit = {
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, 0)
val log = createTopicAndGetLog(topic, topicPartition)
@ -195,9 +186,8 @@ class LogOffsetTest extends BaseRequestTest {
assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP, Optional.empty))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testGetOffsetsBeforeEarliestTime(quorum: String): Unit = {
@Test
def testGetOffsetsBeforeEarliestTime(): Unit = {
val random = new Random
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, random.nextInt(3))

View File

@ -27,9 +27,7 @@ import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerialize
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import java.io.File
import java.util.Properties
@ -104,9 +102,8 @@ class LogRecoveryTest extends QuorumTestHarness {
super.tearDown()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testHWCheckpointNoFailuresSingleLogSegment(quorum: String): Unit = {
@Test
def testHWCheckpointNoFailuresSingleLogSegment(): Unit = {
val numMessages = 2L
sendMessages(numMessages.toInt)
@ -122,9 +119,8 @@ class LogRecoveryTest extends QuorumTestHarness {
assertEquals(numMessages, followerHW)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testHWCheckpointWithFailuresSingleLogSegment(quorum: String): Unit = {
@Test
def testHWCheckpointWithFailuresSingleLogSegment(): Unit = {
var leader = getLeaderIdForPartition(servers, topicPartition)
assertEquals(0L, hwFile1.read().getOrDefault(topicPartition, 0L))
@ -183,9 +179,8 @@ class LogRecoveryTest extends QuorumTestHarness {
assertEquals(hw, hwFile2.read().getOrDefault(topicPartition, 0L))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testHWCheckpointNoFailuresMultipleLogSegments(quorum: String): Unit = {
@Test
def testHWCheckpointNoFailuresMultipleLogSegments(): Unit = {
sendMessages(20)
val hw = 20L
// give some time for follower 1 to record leader HW of 600
@ -200,9 +195,8 @@ class LogRecoveryTest extends QuorumTestHarness {
assertEquals(hw, followerHW)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testHWCheckpointWithFailuresMultipleLogSegments(quorum: String): Unit = {
@Test
def testHWCheckpointWithFailuresMultipleLogSegments(): Unit = {
var leader = getLeaderIdForPartition(servers, topicPartition)
sendMessages(2)

View File

@ -27,9 +27,7 @@ import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.test.TestUtils.isValidClusterId
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import scala.collection.Seq
import scala.jdk.CollectionConverters._
@ -41,24 +39,21 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
doSetup(testInfo, createOffsetsTopic = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testClusterIdWithRequestVersion1(quorum: String): Unit = {
@Test
def testClusterIdWithRequestVersion1(): Unit = {
val v1MetadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
val v1ClusterId = v1MetadataResponse.clusterId
assertNull(v1ClusterId, s"v1 clusterId should be null")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testClusterIdIsValid(quorum: String): Unit = {
@Test
def testClusterIdIsValid(): Unit = {
val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(4.toShort))
isValidClusterId(metadataResponse.clusterId)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testRack(quorum: String): Unit = {
@Test
def testRack(): Unit = {
val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(4.toShort))
// Validate rack matches what's set in generateConfigs() above
metadataResponse.brokers.forEach { broker =>
@ -66,9 +61,8 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testIsInternal(quorum: String): Unit = {
@Test
def testIsInternal(): Unit = {
val internalTopic = Topic.GROUP_METADATA_TOPIC_NAME
val notInternalTopic = "notInternal"
// create the topics
@ -88,9 +82,8 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
assertEquals(Set(internalTopic).asJava, metadataResponse.buildCluster().internalTopics)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testNoTopicsRequest(quorum: String): Unit = {
@Test
def testNoTopicsRequest(): Unit = {
// create some topics
createTopic("t1", 3, 2)
createTopic("t2", 3, 2)
@ -100,9 +93,8 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
assertTrue(metadataResponse.topicMetadata.isEmpty, "Response should have no topics")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAutoTopicCreation(quorum: String): Unit = {
@Test
def testAutoTopicCreation(): Unit = {
val topic1 = "t1"
val topic2 = "t2"
val topic3 = "t3"
@ -128,9 +120,8 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response3.errors.get(topic5))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAutoCreateTopicWithInvalidReplicationFactor(quorum: String): Unit = {
@Test
def testAutoCreateTopicWithInvalidReplicationFactor(): Unit = {
// Shutdown all but one broker so that the number of brokers is less than the default replication factor
brokers.tail.foreach(_.shutdown())
brokers.tail.foreach(_.awaitShutdown())
@ -144,9 +135,8 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
assertEquals(0, topicMetadata.partitionMetadata.size)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAllTopicsRequest(quorum: String): Unit = {
@Test
def testAllTopicsRequest(): Unit = {
// create some topics
createTopic("t1", 3, 2)
createTopic("t2", 3, 2)
@ -162,9 +152,8 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
assertEquals(2, metadataResponseV1.topicMetadata.size(), "V1 Response should have 2 (all) topics")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testTopicIdsInResponse(quorum: String): Unit = {
@Test
def testTopicIdsInResponse(): Unit = {
val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
val topic1 = "topic1"
val topic2 = "topic2"
@ -192,9 +181,8 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
/**
* Preferred replica should be the first item in the replicas list
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testPreferredReplica(quorum: String): Unit = {
@Test
def testPreferredReplica(): Unit = {
val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
createTopicWithAssignment("t1", replicaAssignment)
// Test metadata on two different brokers to ensure that metadata propagation works correctly
@ -216,9 +204,8 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testReplicaDownResponse(quorum: String): Unit = {
@Test
def testReplicaDownResponse(): Unit = {
val replicaDownTopic = "replicaDown"
val replicaCount = 3
@ -262,9 +249,8 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
assertEquals(replicaCount, v1PartitionMetadata.replicaIds.size, s"Response should have $replicaCount replicas")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testIsrAfterBrokerShutDownAndJoinsBack(quorum: String): Unit = {
@Test
def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
def checkIsr[B <: KafkaBroker](
brokers: Seq[B],
topic: String
@ -300,9 +286,8 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
checkIsr(brokers, topic)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAliveBrokersWithNoTopics(quorum: String): Unit = {
@Test
def testAliveBrokersWithNoTopics(): Unit = {
def checkMetadata[B <: KafkaBroker](
brokers: Seq[B],
expectedBrokersCount: Int

View File

@ -26,16 +26,14 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.Test
import scala.jdk.CollectionConverters._
class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testOffsetsForLeaderEpochErrorCodes(quorum: String): Unit = {
@Test
def testOffsetsForLeaderEpochErrorCodes(): Unit = {
val topic = "topic"
val partition = new TopicPartition(topic, 0)
val epochs = offsetForLeaderTopicCollectionFor(partition, 0, RecordBatch.NO_PARTITION_LEADER_EPOCH)
@ -57,9 +55,8 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, nonReplica, request)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCurrentEpochValidation(quorum: String): Unit = {
@Test
def testCurrentEpochValidation(): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition(topic, 0)
val partitionToLeader = createTopic(topic, replicationFactor = 3)

View File

@ -32,9 +32,9 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import org.junit.jupiter.params.provider.ValueSource
import java.util.concurrent.TimeUnit
import scala.jdk.CollectionConverters._
@ -47,9 +47,8 @@ class ProduceRequestTest extends BaseRequestTest {
val metricsKeySet = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testSimpleProduceRequest(quorum: String): Unit = {
@Test
def testSimpleProduceRequest(): Unit = {
val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
def sendAndCheck(memoryRecords: MemoryRecords, expectedOffset: Long): Unit = {
@ -163,9 +162,8 @@ class ProduceRequestTest extends BaseRequestTest {
assertEquals("One or more records have been rejected due to invalid timestamp", partitionProduceResponse.errorMessage)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testProduceToNonReplica(quorum: String): Unit = {
@Test
def testProduceToNonReplica(): Unit = {
val topic = "topic"
val partition = 0
@ -212,9 +210,8 @@ class ProduceRequestTest extends BaseRequestTest {
}.getOrElse(throw new AssertionError(s"No leader elected for topic $topic"))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCorruptLz4ProduceRequest(quorum: String): Unit = {
@Test
def testCorruptLz4ProduceRequest(): Unit = {
val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
val topicId = getTopicIds().get("topic").get
val timestamp = 1000000
@ -247,9 +244,8 @@ class ProduceRequestTest extends BaseRequestTest {
assertTrue(TestUtils.meterCount(s"${BrokerTopicMetrics.INVALID_MESSAGE_CRC_RECORDS_PER_SEC}") > 0)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testZSTDProduceRequest(quorum: String): Unit = {
@Test
def testZSTDProduceRequest(): Unit = {
val topic = "topic"
val partition = 0

View File

@ -17,15 +17,13 @@
package kafka.server
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.{AfterEach, Test}
import kafka.utils.TestUtils
import TestUtils._
import kafka.api.IntegrationTestHarness
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringSerializer
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
class ReplicaFetchTest extends IntegrationTestHarness {
val topic1 = "foo"
@ -39,9 +37,8 @@ class ReplicaFetchTest extends IntegrationTestHarness {
override def brokerCount: Int = 2
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testReplicaFetcherThread(quorum: String): Unit = {
@Test
def testReplicaFetcherThread(): Unit = {
val partition = 0
val testMessageList1 = List("test1", "test2", "test3", "test4")
val testMessageList2 = List("test5", "test6", "test7", "test8")

View File

@ -37,9 +37,7 @@ import org.apache.kafka.server.common.{Feature, MetadataVersion}
import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.server.quota.QuotaType
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{AfterEach, Test}
import scala.jdk.CollectionConverters._
import scala.util.Using
@ -67,15 +65,13 @@ class ReplicationQuotasTest extends QuorumTestHarness {
super.tearDown()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def shouldBootstrapTwoBrokersWithLeaderThrottle(quorum: String): Unit = {
@Test
def shouldBootstrapTwoBrokersWithLeaderThrottle(): Unit = {
shouldMatchQuotaReplicatingThroughAnAsymmetricTopology(true)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def shouldBootstrapTwoBrokersWithFollowerThrottle(quorum: String): Unit = {
@Test
def shouldBootstrapTwoBrokersWithFollowerThrottle(): Unit = {
shouldMatchQuotaReplicatingThroughAnAsymmetricTopology(false)
}
@ -194,9 +190,8 @@ class ReplicationQuotasTest extends QuorumTestHarness {
def tp(partition: Int): TopicPartition = new TopicPartition(topic, partition)
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def shouldThrottleOldSegments(quorum: String): Unit = {
@Test
def shouldThrottleOldSegments(): Unit = {
/**
* Simple test which ensures throttled replication works when the dataset spans many segments
*/

View File

@ -43,9 +43,7 @@ import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, A
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs}
import org.apache.kafka.server.quota.QuotaType
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import java.net.InetAddress
import java.util
@ -133,32 +131,28 @@ class RequestQuotaTest extends BaseRequestTest {
finally super.tearDown()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testResponseThrottleTime(quorum: String): Unit = {
@Test
def testResponseThrottleTime(): Unit = {
for (apiKey <- clientActions ++ clusterActionsWithThrottleForBroker)
submitTest(apiKey, () => checkRequestThrottleTime(apiKey))
waitAndCheckResults()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testResponseThrottleTimeWhenBothProduceAndRequestQuotasViolated(quorum: String): Unit = {
@Test
def testResponseThrottleTimeWhenBothProduceAndRequestQuotasViolated(): Unit = {
submitTest(ApiKeys.PRODUCE, () => checkSmallQuotaProducerRequestThrottleTime())
waitAndCheckResults()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated(quorum: String): Unit = {
@Test
def testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated(): Unit = {
submitTest(ApiKeys.FETCH, () => checkSmallQuotaConsumerRequestThrottleTime())
waitAndCheckResults()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUnthrottledClient(quorum: String): Unit = {
@Test
def testUnthrottledClient(): Unit = {
for (apiKey <- clientActions) {
submitTest(apiKey, () => checkUnthrottledClient(apiKey))
}
@ -166,9 +160,8 @@ class RequestQuotaTest extends BaseRequestTest {
waitAndCheckResults()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testExemptRequestTime(quorum: String): Unit = {
@Test
def testExemptRequestTime(): Unit = {
// Exclude `DESCRIBE_QUORUM`, maybe it shouldn't be a cluster action
val actions = clusterActions -- clusterActionsWithThrottleForBroker -- RequestQuotaTest.Envelope -- RequestQuotaTest.ShareGroupState - ApiKeys.DESCRIBE_QUORUM
for (apiKey <- actions) {
@ -178,9 +171,8 @@ class RequestQuotaTest extends BaseRequestTest {
waitAndCheckResults()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testUnauthorizedThrottle(quorum: String): Unit = {
@Test
def testUnauthorizedThrottle(): Unit = {
RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal
val apiKeys = ApiKeys.brokerApis

View File

@ -29,11 +29,11 @@ import org.apache.kafka.common.utils.Exit
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
import org.apache.kafka.storage.internals.log.LogManager
import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{MethodSource, ValueSource}
import org.junit.jupiter.params.provider.MethodSource
import java.time.Duration
import java.util.Properties
@ -134,18 +134,16 @@ class ServerShutdownTest extends KafkaServerTestHarness {
producer.close()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCleanShutdownAfterFailedStartup(quorum: String): Unit = {
@Test
def testCleanShutdownAfterFailedStartup(): Unit = {
propsToChangeUponRestart.setProperty(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG, "1000")
shutdownBroker()
shutdownKRaftController()
verifyCleanShutdownAfterFailedStartup[CancellationException]
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = {
@Test
def testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(): Unit = {
createTopic(topic)
shutdownBroker()
config.logDirs.foreach { dirName =>
@ -174,9 +172,8 @@ class ServerShutdownTest extends KafkaServerTestHarness {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testShutdownWithKRaftControllerUnavailable(quorum: String): Unit = {
@Test
def testShutdownWithKRaftControllerUnavailable(): Unit = {
shutdownKRaftController()
killBroker(0, Duration.ofSeconds(1))
CoreUtils.delete(broker.config.logDirs)
@ -220,9 +217,8 @@ class ServerShutdownTest extends KafkaServerTestHarness {
.count(isNonDaemonKafkaThread))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsecutiveShutdown(quorum: String): Unit = {
@Test
def testConsecutiveShutdown(): Unit = {
shutdownBroker()
brokers.head.shutdown()
}

View File

@ -34,10 +34,8 @@ import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.collection.mutable.ListBuffer
import scala.collection.{Map, Seq}
@ -64,9 +62,8 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
super.tearDown()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader(quorum: String): Unit = {
@Test
def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader(): Unit = {
brokers ++= (0 to 1).map { id => createBroker(fromProps(createBrokerConfig(id))) }
// Given two topics with replication of a single partition
@ -97,9 +94,8 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
waitUntilTrue(() => messagesHaveLeaderEpoch(brokers(0), expectedLeaderEpoch, 4), "Leader epoch should be 1")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def shouldSendLeaderEpochRequestAndGetAResponse(quorum: String): Unit = {
@Test
def shouldSendLeaderEpochRequestAndGetAResponse(): Unit = {
//3 brokers, put partition on 100/101 and then pretend to be 102
brokers ++= (100 to 102).map { id => createBroker(fromProps(createBrokerConfig(id))) }
@ -145,9 +141,8 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
fetcher1.close()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def shouldIncreaseLeaderEpochBetweenLeaderRestarts(quorum: String): Unit = {
@Test
def shouldIncreaseLeaderEpochBetweenLeaderRestarts(): Unit = {
//Setup: we are only interested in the single partition on broker 101
brokers += createBroker(fromProps(createBrokerConfig(100)))
assertEquals(controllerServer.config.nodeId, waitUntilQuorumLeaderElected(controllerServer))

View File

@ -21,8 +21,7 @@ import kafka.api.AbstractAuthorizerIntegrationTest;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
@ -35,9 +34,8 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.fail;
public class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
@ParameterizedTest
@ValueSource(strings = {"kraft"})
public void testDescribeGroupCliWithGroupDescribe(String quorum) throws Exception {
@Test
public void testDescribeGroupCliWithGroupDescribe() throws Exception {
addAndVerifyAcls(CollectionConverters.asScala(Collections.singleton(new AccessControlEntry(ClientPrincipal().toString(), "*", DESCRIBE, ALLOW))).toSet(), groupResource());
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group()};