mirror of https://github.com/apache/kafka.git
MINOR: Default test name added to tools (#15666)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
6de58d2731
commit
31e8a7fb04
|
@ -45,8 +45,6 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class ToolsTestUtils {
|
public class ToolsTestUtils {
|
||||||
/** @see TestInfoUtils#TestWithParameterizedQuorumName() */
|
|
||||||
public static final String TEST_WITH_PARAMETERIZED_QUORUM_NAME = "{displayName}.{argumentsWithNames}";
|
|
||||||
/** @see TestInfoUtils#TestWithParameterizedQuorumAndGroupProtocolNames() */
|
/** @see TestInfoUtils#TestWithParameterizedQuorumAndGroupProtocolNames() */
|
||||||
public static final String TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES = "{displayName}.quorum={0}.groupProtocol={1}";
|
public static final String TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES = "{displayName}.quorum={0}.groupProtocol={1}";
|
||||||
|
|
||||||
|
|
|
@ -151,7 +151,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
adminClient.close();
|
adminClient.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testCreate(String quorum) throws Exception {
|
public void testCreate(String quorum) throws Exception {
|
||||||
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, 2, 1,
|
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, 2, 1,
|
||||||
|
@ -161,7 +161,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
"Admin client didn't see the created topic. It saw: " + adminClient.listTopics().names().get());
|
"Admin client didn't see the created topic. It saw: " + adminClient.listTopics().names().get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testCreateWithDefaults(String quorum) throws Exception {
|
public void testCreateWithDefaults(String quorum) throws Exception {
|
||||||
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
|
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
|
||||||
|
@ -177,7 +177,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
assertEquals(defaultReplicationFactor, (short) partitions.get(0).replicas().size(), "Unequal replication factor: " + partitions.get(0).replicas().size());
|
assertEquals(defaultReplicationFactor, (short) partitions.get(0).replicas().size(), "Unequal replication factor: " + partitions.get(0).replicas().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testCreateWithDefaultReplication(String quorum) throws Exception {
|
public void testCreateWithDefaultReplication(String quorum) throws Exception {
|
||||||
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, 2, defaultReplicationFactor,
|
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, 2, defaultReplicationFactor,
|
||||||
|
@ -193,7 +193,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
assertEquals(defaultReplicationFactor, (short) partitions.get(0).replicas().size(), "Unequal replication factor: " + partitions.get(0).replicas().size());
|
assertEquals(defaultReplicationFactor, (short) partitions.get(0).replicas().size(), "Unequal replication factor: " + partitions.get(0).replicas().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testCreateWithDefaultPartitions(String quorum) throws Exception {
|
public void testCreateWithDefaultPartitions(String quorum) throws Exception {
|
||||||
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, 2,
|
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, 2,
|
||||||
|
@ -210,7 +210,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
assertEquals(2, (short) partitions.get(0).replicas().size(), "Partitions not replicated: " + partitions.get(0).replicas().size());
|
assertEquals(2, (short) partitions.get(0).replicas().size(), "Partitions not replicated: " + partitions.get(0).replicas().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testCreateWithConfigs(String quorum) throws Exception {
|
public void testCreateWithConfigs(String quorum) throws Exception {
|
||||||
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
|
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
|
||||||
|
@ -226,9 +226,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
"Config not set correctly: " + configs.get("delete.retention.ms").value());
|
"Config not set correctly: " + configs.get("delete.retention.ms").value());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testCreateWhenAlreadyExists(String quorum) throws Exception {
|
public void testCreateWhenAlreadyExists(String quorum) {
|
||||||
// create the topic
|
// create the topic
|
||||||
TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap(
|
TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap(
|
||||||
"--create", "--partitions", Integer.toString(defaultNumPartitions), "--replication-factor", "1",
|
"--create", "--partitions", Integer.toString(defaultNumPartitions), "--replication-factor", "1",
|
||||||
|
@ -242,7 +242,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
"Expected TopicExistsException to throw");
|
"Expected TopicExistsException to throw");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testCreateWhenAlreadyExistsWithIfNotExists(String quorum) throws Exception {
|
public void testCreateWhenAlreadyExistsWithIfNotExists(String quorum) throws Exception {
|
||||||
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
|
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
|
||||||
|
@ -253,7 +253,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
topicService.createTopic(createOpts);
|
topicService.createTopic(createOpts);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testCreateWithReplicaAssignment(String quorum) throws Exception {
|
public void testCreateWithReplicaAssignment(String quorum) throws Exception {
|
||||||
scala.collection.mutable.HashMap<Object, Seq<Object>> replicaAssignmentMap = new scala.collection.mutable.HashMap<>();
|
scala.collection.mutable.HashMap<Object, Seq<Object>> replicaAssignmentMap = new scala.collection.mutable.HashMap<>();
|
||||||
|
@ -287,7 +287,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
return partitions.get(partitionNumber).replicas().stream().map(Node::id).collect(Collectors.toList());
|
return partitions.get(partitionNumber).replicas().stream().map(Node::id).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testCreateWithInvalidReplicationFactor(String quorum) {
|
public void testCreateWithInvalidReplicationFactor(String quorum) {
|
||||||
TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2", "--replication-factor", Integer.toString(Short.MAX_VALUE + 1),
|
TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2", "--replication-factor", Integer.toString(Short.MAX_VALUE + 1),
|
||||||
|
@ -295,7 +295,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
|
assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testCreateWithNegativeReplicationFactor(String quorum) {
|
public void testCreateWithNegativeReplicationFactor(String quorum) {
|
||||||
TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create",
|
TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create",
|
||||||
|
@ -303,14 +303,14 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
|
assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testCreateWithNegativePartitionCount(String quorum) {
|
public void testCreateWithNegativePartitionCount(String quorum) {
|
||||||
TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "-1", "--replication-factor", "1", "--topic", testTopicName);
|
TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "-1", "--replication-factor", "1", "--topic", testTopicName);
|
||||||
assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
|
assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testInvalidTopicLevelConfig(String quorum) {
|
public void testInvalidTopicLevelConfig(String quorum) {
|
||||||
TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap("--create",
|
TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap("--create",
|
||||||
|
@ -319,9 +319,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
assertThrows(ConfigException.class, () -> topicService.createTopic(createOpts), "Expected ConfigException to throw");
|
assertThrows(ConfigException.class, () -> topicService.createTopic(createOpts), "Expected ConfigException to throw");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testListTopics(String quorum) throws Exception {
|
public void testListTopics(String quorum) {
|
||||||
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
|
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
|
||||||
scala.collection.immutable.Map$.MODULE$.empty(), new Properties()
|
scala.collection.immutable.Map$.MODULE$.empty(), new Properties()
|
||||||
);
|
);
|
||||||
|
@ -330,9 +330,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
assertTrue(output.contains(testTopicName), "Expected topic name to be present in output: " + output);
|
assertTrue(output.contains(testTopicName), "Expected topic name to be present in output: " + output);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testListTopicsWithIncludeList(String quorum) throws ExecutionException, InterruptedException {
|
public void testListTopicsWithIncludeList(String quorum) {
|
||||||
String topic1 = "kafka.testTopic1";
|
String topic1 = "kafka.testTopic1";
|
||||||
String topic2 = "kafka.testTopic2";
|
String topic2 = "kafka.testTopic2";
|
||||||
String topic3 = "oooof.testTopic1";
|
String topic3 = "oooof.testTopic1";
|
||||||
|
@ -355,9 +355,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
assertFalse(output.contains(topic3), "Do not expect topic name " + topic3 + " to be present in output: " + output);
|
assertFalse(output.contains(topic3), "Do not expect topic name " + topic3 + " to be present in output: " + output);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testListTopicsWithExcludeInternal(String quorum) throws ExecutionException, InterruptedException {
|
public void testListTopicsWithExcludeInternal(String quorum) {
|
||||||
String topic1 = "kafka.testTopic1";
|
String topic1 = "kafka.testTopic1";
|
||||||
String hiddenConsumerTopic = Topic.GROUP_METADATA_TOPIC_NAME;
|
String hiddenConsumerTopic = Topic.GROUP_METADATA_TOPIC_NAME;
|
||||||
int partition = 2;
|
int partition = 2;
|
||||||
|
@ -375,7 +375,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
assertFalse(output.contains(hiddenConsumerTopic), "Do not expect topic name " + hiddenConsumerTopic + " to be present in output: " + output);
|
assertFalse(output.contains(hiddenConsumerTopic), "Do not expect topic name " + hiddenConsumerTopic + " to be present in output: " + output);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testAlterPartitionCount(String quorum) throws ExecutionException, InterruptedException {
|
public void testAlterPartitionCount(String quorum) throws ExecutionException, InterruptedException {
|
||||||
int partition = 2;
|
int partition = 2;
|
||||||
|
@ -393,7 +393,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
assertEquals(3, topicDescription.partitions().size(), "Expected partition count to be 3. Got: " + topicDescription.partitions().size());
|
assertEquals(3, topicDescription.partitions().size(), "Expected partition count to be 3. Got: " + topicDescription.partitions().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testAlterAssignment(String quorum) throws ExecutionException, InterruptedException {
|
public void testAlterAssignment(String quorum) throws ExecutionException, InterruptedException {
|
||||||
int partition = 2;
|
int partition = 2;
|
||||||
|
@ -416,9 +416,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
assertEquals(Arrays.asList(4, 2), partitionReplicas, "Expected to have replicas 4,2. Got: " + partitionReplicas);
|
assertEquals(Arrays.asList(4, 2), partitionReplicas, "Expected to have replicas 4,2. Got: " + partitionReplicas);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testAlterAssignmentWithMoreAssignmentThanPartitions(String quorum) throws ExecutionException, InterruptedException {
|
public void testAlterAssignmentWithMoreAssignmentThanPartitions(String quorum) {
|
||||||
int partition = 2;
|
int partition = 2;
|
||||||
short replicationFactor = 2;
|
short replicationFactor = 2;
|
||||||
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partition, replicationFactor,
|
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partition, replicationFactor,
|
||||||
|
@ -430,9 +430,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
"Expected to fail with ExecutionException");
|
"Expected to fail with ExecutionException");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testAlterAssignmentWithMorePartitionsThanAssignment(String quorum) throws ExecutionException, InterruptedException {
|
public void testAlterAssignmentWithMorePartitionsThanAssignment(String quorum) {
|
||||||
int partition = 2;
|
int partition = 2;
|
||||||
short replicationFactor = 2;
|
short replicationFactor = 2;
|
||||||
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partition, replicationFactor,
|
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partition, replicationFactor,
|
||||||
|
@ -445,9 +445,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
"Expected to fail with ExecutionException");
|
"Expected to fail with ExecutionException");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testAlterWithInvalidPartitionCount(String quorum) throws Exception {
|
public void testAlterWithInvalidPartitionCount(String quorum) {
|
||||||
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
|
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
|
||||||
scala.collection.immutable.Map$.MODULE$.empty(), new Properties()
|
scala.collection.immutable.Map$.MODULE$.empty(), new Properties()
|
||||||
);
|
);
|
||||||
|
@ -456,7 +456,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
"Expected to fail with ExecutionException");
|
"Expected to fail with ExecutionException");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testAlterWhenTopicDoesntExist(String quorum) {
|
public void testAlterWhenTopicDoesntExist(String quorum) {
|
||||||
// alter a topic that does not exist without --if-exists
|
// alter a topic that does not exist without --if-exists
|
||||||
|
@ -465,16 +465,16 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
assertThrows(IllegalArgumentException.class, () -> topicService.alterTopic(alterOpts), "Expected to fail with IllegalArgumentException");
|
assertThrows(IllegalArgumentException.class, () -> topicService.alterTopic(alterOpts), "Expected to fail with IllegalArgumentException");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testAlterWhenTopicDoesntExistWithIfExists(String quorum) throws ExecutionException, InterruptedException {
|
public void testAlterWhenTopicDoesntExistWithIfExists(String quorum) throws ExecutionException, InterruptedException {
|
||||||
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName, "--partitions", "1", "--if-exists"));
|
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName, "--partitions", "1", "--if-exists"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testCreateAlterTopicWithRackAware(String quorum) throws Exception {
|
public void testCreateAlterTopicWithRackAware(String quorum) throws Exception {
|
||||||
Map<Integer, String> rackInfo = new HashMap<Integer, String>();
|
Map<Integer, String> rackInfo = new HashMap<>();
|
||||||
rackInfo.put(0, "rack1");
|
rackInfo.put(0, "rack1");
|
||||||
rackInfo.put(1, "rack2");
|
rackInfo.put(1, "rack2");
|
||||||
rackInfo.put(2, "rack2");
|
rackInfo.put(2, "rack2");
|
||||||
|
@ -517,7 +517,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
true, true, true);
|
true, true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testConfigPreservationAcrossPartitionAlteration(String quorum) throws Exception {
|
public void testConfigPreservationAcrossPartitionAlteration(String quorum) throws Exception {
|
||||||
String cleanUpPolicy = "compact";
|
String cleanUpPolicy = "compact";
|
||||||
|
@ -550,7 +550,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
assertEquals(cleanUpPolicy, newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Updated properties have incorrect value");
|
assertEquals(cleanUpPolicy, newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Updated properties have incorrect value");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testTopicDeletion(String quorum) throws Exception {
|
public void testTopicDeletion(String quorum) throws Exception {
|
||||||
// create the NormalTopic
|
// create the NormalTopic
|
||||||
|
@ -568,7 +568,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
TestUtils.verifyTopicDeletion(zkClientOrNull(), testTopicName, 1, brokers());
|
TestUtils.verifyTopicDeletion(zkClientOrNull(), testTopicName, 1, brokers());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testTopicWithCollidingCharDeletionAndCreateAgain(String quorum) throws Exception {
|
public void testTopicWithCollidingCharDeletionAndCreateAgain(String quorum) throws Exception {
|
||||||
// create the topic with colliding chars
|
// create the topic with colliding chars
|
||||||
|
@ -590,7 +590,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
), "Should be able to create a topic with colliding chars after deletion.");
|
), "Should be able to create a topic with colliding chars after deletion.");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteInternalTopic(String quorum) throws Exception {
|
public void testDeleteInternalTopic(String quorum) throws Exception {
|
||||||
// create the offset topic
|
// create the offset topic
|
||||||
|
@ -610,7 +610,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
TestUtils.verifyTopicDeletion(zkClientOrNull(), Topic.GROUP_METADATA_TOPIC_NAME, defaultNumPartitions, brokers());
|
TestUtils.verifyTopicDeletion(zkClientOrNull(), Topic.GROUP_METADATA_TOPIC_NAME, defaultNumPartitions, brokers());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteWhenTopicDoesntExist(String quorum) {
|
public void testDeleteWhenTopicDoesntExist(String quorum) {
|
||||||
// delete a topic that does not exist
|
// delete a topic that does not exist
|
||||||
|
@ -619,15 +619,15 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
"Expected an exception when trying to delete a topic that does not exist.");
|
"Expected an exception when trying to delete a topic that does not exist.");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteWhenTopicDoesntExistWithIfExists(String quorum) throws ExecutionException, InterruptedException {
|
public void testDeleteWhenTopicDoesntExistWithIfExists(String quorum) throws ExecutionException, InterruptedException {
|
||||||
topicService.deleteTopic(buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName, "--if-exists"));
|
topicService.deleteTopic(buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName, "--if-exists"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDescribe(String quorum) throws ExecutionException, InterruptedException {
|
public void testDescribe(String quorum) {
|
||||||
int partition = 2;
|
int partition = 2;
|
||||||
short replicationFactor = 2;
|
short replicationFactor = 2;
|
||||||
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partition, replicationFactor,
|
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partition, replicationFactor,
|
||||||
|
@ -639,7 +639,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), "Row does not start with " + testTopicName + ". Row is: " + rows[0]);
|
assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), "Row does not start with " + testTopicName + ". Row is: " + rows[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDescribeWhenTopicDoesntExist(String quorum) {
|
public void testDescribeWhenTopicDoesntExist(String quorum) {
|
||||||
assertThrows(IllegalArgumentException.class,
|
assertThrows(IllegalArgumentException.class,
|
||||||
|
@ -647,13 +647,13 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
"Expected an exception when trying to describe a topic that does not exist.");
|
"Expected an exception when trying to describe a topic that does not exist.");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDescribeWhenTopicDoesntExistWithIfExists(String quorum) throws ExecutionException, InterruptedException {
|
public void testDescribeWhenTopicDoesntExistWithIfExists(String quorum) throws ExecutionException, InterruptedException {
|
||||||
topicService.describeTopic(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName, "--if-exists"));
|
topicService.describeTopic(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName, "--if-exists"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDescribeUnavailablePartitions(String quorum) throws ExecutionException, InterruptedException {
|
public void testDescribeUnavailablePartitions(String quorum) throws ExecutionException, InterruptedException {
|
||||||
int partitions = 6;
|
int partitions = 6;
|
||||||
|
@ -716,9 +716,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDescribeUnderReplicatedPartitions(String quorum) throws ExecutionException, InterruptedException {
|
public void testDescribeUnderReplicatedPartitions(String quorum) {
|
||||||
int partitions = 1;
|
int partitions = 1;
|
||||||
short replicationFactor = 6;
|
short replicationFactor = 6;
|
||||||
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partitions, replicationFactor,
|
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partitions, replicationFactor,
|
||||||
|
@ -739,9 +739,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDescribeUnderMinIsrPartitions(String quorum) throws ExecutionException, InterruptedException {
|
public void testDescribeUnderMinIsrPartitions(String quorum) {
|
||||||
Properties topicConfig = new Properties();
|
Properties topicConfig = new Properties();
|
||||||
topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6");
|
topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6");
|
||||||
int partitions = 1;
|
int partitions = 1;
|
||||||
|
@ -769,7 +769,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String quorum) throws ExecutionException, InterruptedException {
|
public void testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String quorum) throws ExecutionException, InterruptedException {
|
||||||
TopicPartition tp = new TopicPartition(testTopicName, 0);
|
TopicPartition tp = new TopicPartition(testTopicName, 0);
|
||||||
|
@ -848,9 +848,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
|
TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDescribeAtMinIsrPartitions(String quorum) throws ExecutionException, InterruptedException {
|
public void testDescribeAtMinIsrPartitions(String quorum) {
|
||||||
Properties topicConfig = new Properties();
|
Properties topicConfig = new Properties();
|
||||||
topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4");
|
topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4");
|
||||||
|
|
||||||
|
@ -892,9 +892,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
*
|
*
|
||||||
* Output should only display the (1) topic with partition under min ISR count and (3) topic with offline partition
|
* Output should only display the (1) topic with partition under min ISR count and (3) topic with offline partition
|
||||||
*/
|
*/
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDescribeUnderMinIsrPartitionsMixed(String quorum) throws ExecutionException, InterruptedException {
|
public void testDescribeUnderMinIsrPartitionsMixed(String quorum) {
|
||||||
String underMinIsrTopic = "under-min-isr-topic";
|
String underMinIsrTopic = "under-min-isr-topic";
|
||||||
String notUnderMinIsrTopic = "not-under-min-isr-topic";
|
String notUnderMinIsrTopic = "not-under-min-isr-topic";
|
||||||
String offlineTopic = "offline-topic";
|
String offlineTopic = "offline-topic";
|
||||||
|
@ -952,9 +952,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDescribeReportOverriddenConfigs(String quorum) throws Exception {
|
public void testDescribeReportOverriddenConfigs(String quorum) {
|
||||||
String config = "file.delete.delay.ms=1000";
|
String config = "file.delete.delay.ms=1000";
|
||||||
Properties topicConfig = new Properties();
|
Properties topicConfig = new Properties();
|
||||||
topicConfig.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1000");
|
topicConfig.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1000");
|
||||||
|
@ -968,9 +968,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
assertTrue(output.contains(config), String.format("Describe output should have contained %s", config));
|
assertTrue(output.contains(config), String.format("Describe output should have contained %s", config));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDescribeAndListTopicsWithoutInternalTopics(String quorum) throws Exception {
|
public void testDescribeAndListTopicsWithoutInternalTopics(String quorum) {
|
||||||
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
|
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
|
||||||
scala.collection.immutable.Map$.MODULE$.empty(), new Properties()
|
scala.collection.immutable.Map$.MODULE$.empty(), new Properties()
|
||||||
);
|
);
|
||||||
|
@ -991,7 +991,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
"Output should not have contained " + Topic.GROUP_METADATA_TOPIC_NAME);
|
"Output should not have contained " + Topic.GROUP_METADATA_TOPIC_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(String quorum) throws Exception {
|
public void testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(String quorum) throws Exception {
|
||||||
adminClient = spy(adminClient);
|
adminClient = spy(adminClient);
|
||||||
|
@ -1015,9 +1015,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
|
||||||
assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), "Unexpected output: " + rows[0]);
|
assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), "Unexpected output: " + rows[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testCreateWithTopicNameCollision(String quorum) throws ExecutionException, InterruptedException {
|
public void testCreateWithTopicNameCollision(String quorum) {
|
||||||
String topic = "foo_bar";
|
String topic = "foo_bar";
|
||||||
int partitions = 1;
|
int partitions = 1;
|
||||||
short replicationFactor = 6;
|
short replicationFactor = 6;
|
||||||
|
|
|
@ -26,11 +26,10 @@ import java.util.Collections;
|
||||||
|
|
||||||
import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
|
import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
|
||||||
import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
|
import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
|
||||||
import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
|
|
||||||
|
|
||||||
public class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
public class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
||||||
@SuppressWarnings({"deprecation"})
|
@SuppressWarnings({"deprecation"})
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDescribeGroupCliWithGroupDescribe(String quorum) throws Exception {
|
public void testDescribeGroupCliWithGroupDescribe(String quorum) throws Exception {
|
||||||
addAndVerifyAcls(JavaConverters.asScalaSet(Collections.singleton(new AccessControlEntry(ClientPrincipal().toString(), "*", DESCRIBE, ALLOW))).toSet(), groupResource());
|
addAndVerifyAcls(JavaConverters.asScalaSet(Collections.singleton(new AccessControlEntry(ClientPrincipal().toString(), "*", DESCRIBE, ALLOW))).toSet(), groupResource());
|
||||||
|
|
|
@ -38,13 +38,12 @@ import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
|
public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteWithTopicOption(String quorum) {
|
public void testDeleteWithTopicOption(String quorum) {
|
||||||
createOffsetsTopic(listenerName(), new Properties());
|
createOffsetsTopic(listenerName(), new Properties());
|
||||||
|
@ -52,7 +51,7 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
|
||||||
assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs));
|
assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteCmdNonExistingGroup(String quorum) {
|
public void testDeleteCmdNonExistingGroup(String quorum) {
|
||||||
createOffsetsTopic(listenerName(), new Properties());
|
createOffsetsTopic(listenerName(), new Properties());
|
||||||
|
@ -66,7 +65,7 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
|
||||||
"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group");
|
"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteNonExistingGroup(String quorum) {
|
public void testDeleteNonExistingGroup(String quorum) {
|
||||||
createOffsetsTopic(listenerName(), new Properties());
|
createOffsetsTopic(listenerName(), new Properties());
|
||||||
|
@ -81,7 +80,7 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
|
||||||
"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group");
|
"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteCmdNonEmptyGroup(String quorum) throws Exception {
|
public void testDeleteCmdNonEmptyGroup(String quorum) throws Exception {
|
||||||
createOffsetsTopic(listenerName(), new Properties());
|
createOffsetsTopic(listenerName(), new Properties());
|
||||||
|
@ -101,7 +100,7 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
|
||||||
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Output was: (" + output + ")");
|
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Output was: (" + output + ")");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteNonEmptyGroup(String quorum) throws Exception {
|
public void testDeleteNonEmptyGroup(String quorum) throws Exception {
|
||||||
createOffsetsTopic(listenerName(), new Properties());
|
createOffsetsTopic(listenerName(), new Properties());
|
||||||
|
@ -123,7 +122,7 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
|
||||||
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Result was:(" + result + ")");
|
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Result was:(" + result + ")");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteCmdEmptyGroup(String quorum) throws Exception {
|
public void testDeleteCmdEmptyGroup(String quorum) throws Exception {
|
||||||
createOffsetsTopic(listenerName(), new Properties());
|
createOffsetsTopic(listenerName(), new Properties());
|
||||||
|
@ -150,7 +149,7 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
|
||||||
"The consumer group could not be deleted as expected");
|
"The consumer group could not be deleted as expected");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteCmdAllGroups(String quorum) throws Exception {
|
public void testDeleteCmdAllGroups(String quorum) throws Exception {
|
||||||
createOffsetsTopic(listenerName(), new Properties());
|
createOffsetsTopic(listenerName(), new Properties());
|
||||||
|
@ -198,7 +197,7 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
|
||||||
"The consumer group(s) could not be deleted as expected");
|
"The consumer group(s) could not be deleted as expected");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteEmptyGroup(String quorum) throws Exception {
|
public void testDeleteEmptyGroup(String quorum) throws Exception {
|
||||||
createOffsetsTopic(listenerName(), new Properties());
|
createOffsetsTopic(listenerName(), new Properties());
|
||||||
|
@ -223,7 +222,7 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
|
||||||
"The consumer group could not be deleted as expected");
|
"The consumer group could not be deleted as expected");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteCmdWithMixOfSuccessAndError(String quorum) throws Exception {
|
public void testDeleteCmdWithMixOfSuccessAndError(String quorum) throws Exception {
|
||||||
createOffsetsTopic(listenerName(), new Properties());
|
createOffsetsTopic(listenerName(), new Properties());
|
||||||
|
@ -255,7 +254,7 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
|
||||||
"The consumer group deletion did not work as expected");
|
"The consumer group deletion did not work as expected");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteWithMixOfSuccessAndError(String quorum) throws Exception {
|
public void testDeleteWithMixOfSuccessAndError(String quorum) throws Exception {
|
||||||
createOffsetsTopic(listenerName(), new Properties());
|
createOffsetsTopic(listenerName(), new Properties());
|
||||||
|
@ -287,7 +286,7 @@ public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
|
||||||
"The consumer group deletion did not work as expected");
|
"The consumer group deletion did not work as expected");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteWithUnrecognizedNewConsumerOption(String quorum) {
|
public void testDeleteWithUnrecognizedNewConsumerOption(String quorum) {
|
||||||
String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP};
|
String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP};
|
||||||
|
|
|
@ -39,7 +39,6 @@ import java.util.Properties;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
|
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
|
||||||
import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
|
||||||
|
@ -53,7 +52,7 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGr
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteOffsetsNonExistingGroup(String quorum) {
|
public void testDeleteOffsetsNonExistingGroup(String quorum) {
|
||||||
String group = "missing.group";
|
String group = "missing.group";
|
||||||
|
@ -64,49 +63,49 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGr
|
||||||
assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey());
|
assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition(String quorum) {
|
public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition(String quorum) {
|
||||||
testWithStableConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
|
testWithStableConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly(String quorum) {
|
public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly(String quorum) {
|
||||||
testWithStableConsumerGroup(TOPIC, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
|
testWithStableConsumerGroup(TOPIC, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition(String quorum) {
|
public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition(String quorum) {
|
||||||
testWithStableConsumerGroup("foobar", 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION);
|
testWithStableConsumerGroup("foobar", 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly(String quorum) {
|
public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly(String quorum) {
|
||||||
testWithStableConsumerGroup("foobar", -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
|
testWithStableConsumerGroup("foobar", -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition(String quorum) {
|
public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition(String quorum) {
|
||||||
testWithEmptyConsumerGroup(TOPIC, 0, 0, Errors.NONE);
|
testWithEmptyConsumerGroup(TOPIC, 0, 0, Errors.NONE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly(String quorum) {
|
public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly(String quorum) {
|
||||||
testWithEmptyConsumerGroup(TOPIC, -1, 0, Errors.NONE);
|
testWithEmptyConsumerGroup(TOPIC, -1, 0, Errors.NONE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition(String quorum) {
|
public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition(String quorum) {
|
||||||
testWithEmptyConsumerGroup("foobar", 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION);
|
testWithEmptyConsumerGroup("foobar", 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicOnly(String quorum) {
|
public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicOnly(String quorum) {
|
||||||
testWithEmptyConsumerGroup("foobar", -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
|
testWithEmptyConsumerGroup("foobar", -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
|
||||||
|
|
|
@ -46,7 +46,6 @@ import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.kafka.test.TestUtils.RANDOM;
|
import static org.apache.kafka.test.TestUtils.RANDOM;
|
||||||
import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
|
import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
|
||||||
import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
@ -87,7 +86,7 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDescribeWithMultipleSubActions(String quorum) {
|
public void testDescribeWithMultipleSubActions(String quorum) {
|
||||||
AtomicInteger exitStatus = new AtomicInteger(0);
|
AtomicInteger exitStatus = new AtomicInteger(0);
|
||||||
|
@ -107,7 +106,7 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
|
||||||
assertTrue(exitMessage.get().contains("Option [describe] takes at most one of these options"));
|
assertTrue(exitMessage.get().contains("Option [describe] takes at most one of these options"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDescribeWithStateValue(String quorum) {
|
public void testDescribeWithStateValue(String quorum) {
|
||||||
AtomicInteger exitStatus = new AtomicInteger(0);
|
AtomicInteger exitStatus = new AtomicInteger(0);
|
||||||
|
@ -127,7 +126,7 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
|
||||||
assertTrue(exitMessage.get().contains("Option [describe] does not take a value for [state]"));
|
assertTrue(exitMessage.get().contains("Option [describe] does not take a value for [state]"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testPrintVersion(String quorum) {
|
public void testPrintVersion(String quorum) {
|
||||||
ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure();
|
ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure();
|
||||||
|
@ -699,7 +698,7 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
|
||||||
}, "Expected a stable group with two members in describe group state result.");
|
}, "Expected a stable group with two members in describe group state result.");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft", "kraft+kip848"})
|
@ValueSource(strings = {"zk", "kraft", "kraft+kip848"})
|
||||||
public void testDescribeSimpleConsumerGroup(String quorum) throws Exception {
|
public void testDescribeSimpleConsumerGroup(String quorum) throws Exception {
|
||||||
// Ensure that the offsets of consumers which don't use group management are still displayed
|
// Ensure that the offsets of consumers which don't use group management are still displayed
|
||||||
|
@ -791,7 +790,7 @@ public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
|
||||||
assertEquals(TimeoutException.class, e.getCause().getClass());
|
assertEquals(TimeoutException.class, e.getCause().getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testDescribeWithUnrecognizedNewConsumerOption(String quorum) {
|
public void testDescribeWithUnrecognizedNewConsumerOption(String quorum) {
|
||||||
String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
|
String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP};
|
||||||
|
|
|
@ -75,7 +75,6 @@ import java.util.stream.IntStream;
|
||||||
import static java.util.Arrays.asList;
|
import static java.util.Arrays.asList;
|
||||||
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1;
|
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1;
|
||||||
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
|
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
|
||||||
import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
|
|
||||||
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_FOLLOWER_THROTTLE;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_FOLLOWER_THROTTLE;
|
||||||
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_LEADER_THROTTLE;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_LEADER_THROTTLE;
|
||||||
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_LOG_DIR_THROTTLE;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_LOG_DIR_THROTTLE;
|
||||||
|
@ -109,7 +108,7 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testReassignment(String quorum) throws Exception {
|
public void testReassignment(String quorum) throws Exception {
|
||||||
cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
|
cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
|
||||||
|
@ -117,7 +116,7 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
executeAndVerifyReassignment();
|
executeAndVerifyReassignment();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = "zk") // Note: KRaft requires AlterPartition
|
@ValueSource(strings = "zk") // Note: KRaft requires AlterPartition
|
||||||
public void testReassignmentWithAlterPartitionDisabled(String quorum) throws Exception {
|
public void testReassignmentWithAlterPartitionDisabled(String quorum) throws Exception {
|
||||||
// Test reassignment when the IBP is on an older version which does not use
|
// Test reassignment when the IBP is on an older version which does not use
|
||||||
|
@ -130,7 +129,7 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
executeAndVerifyReassignment();
|
executeAndVerifyReassignment();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = "zk") // Note: KRaft requires AlterPartition
|
@ValueSource(strings = "zk") // Note: KRaft requires AlterPartition
|
||||||
public void testReassignmentCompletionDuringPartialUpgrade(String quorum) throws Exception {
|
public void testReassignmentCompletionDuringPartialUpgrade(String quorum) throws Exception {
|
||||||
// Test reassignment during a partial upgrade when some brokers are relying on
|
// Test reassignment during a partial upgrade when some brokers are relying on
|
||||||
|
@ -196,7 +195,7 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
verifyReplicaDeleted(bar0, 1);
|
verifyReplicaDeleted(bar0, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testHighWaterMarkAfterPartitionReassignment(String quorum) throws Exception {
|
public void testHighWaterMarkAfterPartitionReassignment(String quorum) throws Exception {
|
||||||
cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
|
cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
|
||||||
|
@ -226,7 +225,7 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
"Expected broker 3 to have the correct high water mark for the partition.");
|
"Expected broker 3 to have the correct high water mark for the partition.");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testAlterReassignmentThrottle(String quorum) throws Exception {
|
public void testAlterReassignmentThrottle(String quorum) throws Exception {
|
||||||
cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
|
cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
|
||||||
|
@ -263,7 +262,7 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
/**
|
/**
|
||||||
* Test running a reassignment with the interBrokerThrottle set.
|
* Test running a reassignment with the interBrokerThrottle set.
|
||||||
*/
|
*/
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testThrottledReassignment(String quorum) throws Exception {
|
public void testThrottledReassignment(String quorum) throws Exception {
|
||||||
cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
|
cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
|
||||||
|
@ -325,7 +324,7 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
|
waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testProduceAndConsumeWithReassignmentInProgress(String quorum) throws Exception {
|
public void testProduceAndConsumeWithReassignmentInProgress(String quorum) throws Exception {
|
||||||
cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
|
cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
|
||||||
|
@ -366,7 +365,7 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
/**
|
/**
|
||||||
* Test running a reassignment and then cancelling it.
|
* Test running a reassignment and then cancelling it.
|
||||||
*/
|
*/
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testCancellation(String quorum) throws Exception {
|
public void testCancellation(String quorum) throws Exception {
|
||||||
TopicPartition foo0 = new TopicPartition("foo", 0);
|
TopicPartition foo0 = new TopicPartition("foo", 0);
|
||||||
|
@ -410,7 +409,7 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
verifyReplicaDeleted(baz1, 3);
|
verifyReplicaDeleted(baz1, 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"zk", "kraft"})
|
@ValueSource(strings = {"zk", "kraft"})
|
||||||
public void testCancellationWithAddingReplicaInIsr(String quorum) throws Exception {
|
public void testCancellationWithAddingReplicaInIsr(String quorum) throws Exception {
|
||||||
TopicPartition foo0 = new TopicPartition("foo", 0);
|
TopicPartition foo0 = new TopicPartition("foo", 0);
|
||||||
|
@ -536,7 +535,7 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
/**
|
/**
|
||||||
* Test moving partitions between directories.
|
* Test moving partitions between directories.
|
||||||
*/
|
*/
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = "zk") // JBOD not yet implemented for KRaft
|
@ValueSource(strings = "zk") // JBOD not yet implemented for KRaft
|
||||||
public void testLogDirReassignment(String quorum) throws Exception {
|
public void testLogDirReassignment(String quorum) throws Exception {
|
||||||
TopicPartition topicPartition = new TopicPartition("foo", 0);
|
TopicPartition topicPartition = new TopicPartition("foo", 0);
|
||||||
|
@ -586,7 +585,7 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
assertEquals(reassignment.targetDir, info1.curLogDirs.getOrDefault(topicPartition, ""));
|
assertEquals(reassignment.targetDir, info1.curLogDirs.getOrDefault(topicPartition, ""));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME)
|
@ParameterizedTest
|
||||||
@ValueSource(strings = "zk") // JBOD not yet implemented for KRaft
|
@ValueSource(strings = "zk") // JBOD not yet implemented for KRaft
|
||||||
public void testAlterLogDirReassignmentThrottle(String quorum) throws Exception {
|
public void testAlterLogDirReassignmentThrottle(String quorum) throws Exception {
|
||||||
TopicPartition topicPartition = new TopicPartition("foo", 0);
|
TopicPartition topicPartition = new TopicPartition("foo", 0);
|
||||||
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
junit.jupiter.params.displayname.default = "{displayName}.{argumentsWithNames}"
|
Loading…
Reference in New Issue