mirror of https://github.com/apache/kafka.git
MINOR: reduce topicCommandTest brokers from 6 to 3 (#17875)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
b04a498317
commit
acd92be6ea
|
@ -50,8 +50,10 @@ import org.apache.kafka.common.requests.FetchRequest;
|
|||
import org.apache.kafka.common.requests.MetadataResponse;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.kafka.common.test.api.ClusterConfig;
|
||||
import org.apache.kafka.common.test.api.ClusterConfigProperty;
|
||||
import org.apache.kafka.common.test.api.ClusterInstance;
|
||||
import org.apache.kafka.common.test.api.ClusterTemplate;
|
||||
import org.apache.kafka.common.test.api.ClusterTest;
|
||||
import org.apache.kafka.common.test.api.ClusterTestExtensions;
|
||||
import org.apache.kafka.common.test.api.Type;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
|
@ -373,7 +375,13 @@ public class TopicCommandTest {
|
|||
);
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(
|
||||
brokers = 3,
|
||||
serverProperties = {
|
||||
@ClusterConfigProperty(key = "log.initial.task.delay.ms", value = "100"),
|
||||
@ClusterConfigProperty(key = "log.segment.delete.delay.ms", value = "1000")
|
||||
}
|
||||
)
|
||||
public void testCreate(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
|
||||
|
@ -391,7 +399,13 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(
|
||||
brokers = 3,
|
||||
serverProperties = {
|
||||
@ClusterConfigProperty(key = "log.initial.task.delay.ms", value = "100"),
|
||||
@ClusterConfigProperty(key = "log.segment.delete.delay.ms", value = "1000")
|
||||
}
|
||||
)
|
||||
public void testCreateWithDefaults(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
|
||||
|
@ -418,7 +432,13 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(
|
||||
brokers = 3,
|
||||
serverProperties = {
|
||||
@ClusterConfigProperty(key = "log.initial.task.delay.ms", value = "100"),
|
||||
@ClusterConfigProperty(key = "log.segment.delete.delay.ms", value = "1000")
|
||||
}
|
||||
)
|
||||
public void testCreateWithDefaultReplication(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
|
||||
|
@ -436,7 +456,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(brokers = 3)
|
||||
public void testCreateWithDefaultPartitions(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
|
||||
|
@ -455,7 +475,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(brokers = 3)
|
||||
public void testCreateWithConfigs(ClusterInstance clusterInstance) throws Exception {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
|
||||
|
@ -474,7 +494,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(brokers = 3)
|
||||
public void testCreateWhenAlreadyExists(ClusterInstance clusterInstance) throws Exception {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
try (Admin adminClient = clusterInstance.admin();
|
||||
|
@ -492,7 +512,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(brokers = 3)
|
||||
public void testCreateWhenAlreadyExistsWithIfNotExists(ClusterInstance clusterInstance) throws Exception {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
try (Admin adminClient = clusterInstance.admin();
|
||||
|
@ -530,7 +550,6 @@ public class TopicCommandTest {
|
|||
.get(testTopicName)
|
||||
.partitions();
|
||||
|
||||
adminClient.close();
|
||||
assertEquals(3, partitions.size(),
|
||||
"Unequal partition size: " + partitions.size());
|
||||
assertEquals(Arrays.asList(5, 4), getPartitionReplicas(partitions, 0),
|
||||
|
@ -542,7 +561,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(brokers = 3)
|
||||
public void testCreateWithInvalidReplicationFactor(ClusterInstance clusterInstance) throws Exception {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
try (Admin adminClient = clusterInstance.admin();
|
||||
|
@ -554,7 +573,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest
|
||||
public void testCreateWithNegativeReplicationFactor(ClusterInstance clusterInstance) throws Exception {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
try (Admin adminClient = clusterInstance.admin();
|
||||
|
@ -565,7 +584,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest
|
||||
public void testCreateWithNegativePartitionCount(ClusterInstance clusterInstance) throws Exception {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
try (Admin adminClient = clusterInstance.admin();
|
||||
|
@ -575,7 +594,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest
|
||||
public void testInvalidTopicLevelConfig(ClusterInstance clusterInstance) {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
try (Admin adminClient = clusterInstance.admin()) {
|
||||
|
@ -588,7 +607,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest
|
||||
public void testListTopics(ClusterInstance clusterInstance) throws InterruptedException {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
try (Admin adminClient = clusterInstance.admin()) {
|
||||
|
@ -600,7 +619,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(brokers = 3)
|
||||
public void testListTopicsWithIncludeList(ClusterInstance clusterInstance) throws InterruptedException {
|
||||
try (Admin adminClient = clusterInstance.admin()) {
|
||||
String topic1 = "kafka.testTopic1";
|
||||
|
@ -622,7 +641,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(brokers = 3)
|
||||
public void testListTopicsWithExcludeInternal(ClusterInstance clusterInstance) throws InterruptedException {
|
||||
try (Admin adminClient = clusterInstance.admin()) {
|
||||
String topic1 = "kafka.testTopic1";
|
||||
|
@ -638,7 +657,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(brokers = 3)
|
||||
public void testAlterPartitionCount(ClusterInstance clusterInstance) throws Exception {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
try (Admin adminClient = clusterInstance.admin();
|
||||
|
@ -695,7 +714,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(brokers = 3)
|
||||
public void testAlterAssignmentWithMoreAssignmentThanPartitions(ClusterInstance clusterInstance) throws Exception {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
try (Admin adminClient = clusterInstance.admin();
|
||||
|
@ -732,7 +751,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest
|
||||
public void testAlterWithInvalidPartitionCount(ClusterInstance clusterInstance) throws Exception {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
|
||||
|
@ -747,7 +766,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest
|
||||
public void testAlterWhenTopicDoesntExist(ClusterInstance clusterInstance) throws Exception {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
|
||||
|
@ -760,7 +779,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest
|
||||
public void testAlterWhenTopicDoesntExistWithIfExists(ClusterInstance clusterInstance) throws Exception {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
Admin adminClient = clusterInstance.admin();
|
||||
|
@ -823,7 +842,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(brokers = 3)
|
||||
public void testConfigPreservationAcrossPartitionAlteration(ClusterInstance clusterInstance) throws Exception {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
try (Admin adminClient = clusterInstance.admin();
|
||||
|
@ -857,7 +876,13 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(
|
||||
brokers = 3,
|
||||
serverProperties = {
|
||||
@ClusterConfigProperty(key = "log.initial.task.delay.ms", value = "100"),
|
||||
@ClusterConfigProperty(key = "log.segment.delete.delay.ms", value = "1000")
|
||||
}
|
||||
)
|
||||
public void testTopicDeletion(ClusterInstance clusterInstance) throws Exception {
|
||||
try (Admin adminClient = clusterInstance.admin();
|
||||
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
|
||||
|
@ -877,7 +902,13 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(
|
||||
brokers = 3,
|
||||
serverProperties = {
|
||||
@ClusterConfigProperty(key = "log.initial.task.delay.ms", value = "100"),
|
||||
@ClusterConfigProperty(key = "log.segment.delete.delay.ms", value = "1000")
|
||||
}
|
||||
)
|
||||
public void testTopicWithCollidingCharDeletionAndCreateAgain(ClusterInstance clusterInstance) throws Exception {
|
||||
try (Admin adminClient = clusterInstance.admin();
|
||||
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
|
||||
|
@ -902,7 +933,13 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(
|
||||
brokers = 3,
|
||||
serverProperties = {
|
||||
@ClusterConfigProperty(key = "log.initial.task.delay.ms", value = "100"),
|
||||
@ClusterConfigProperty(key = "log.segment.delete.delay.ms", value = "1000")
|
||||
}
|
||||
)
|
||||
public void testDeleteInternalTopic(ClusterInstance clusterInstance) throws Exception {
|
||||
try (Admin adminClient = clusterInstance.admin();
|
||||
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
|
||||
|
@ -926,7 +963,13 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(
|
||||
brokers = 3,
|
||||
serverProperties = {
|
||||
@ClusterConfigProperty(key = "log.initial.task.delay.ms", value = "100"),
|
||||
@ClusterConfigProperty(key = "log.segment.delete.delay.ms", value = "1000")
|
||||
}
|
||||
)
|
||||
public void testDeleteWhenTopicDoesntExist(ClusterInstance clusterInstance) throws Exception {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
try (Admin adminClient = clusterInstance.admin();
|
||||
|
@ -938,7 +981,13 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(
|
||||
brokers = 3,
|
||||
serverProperties = {
|
||||
@ClusterConfigProperty(key = "log.initial.task.delay.ms", value = "100"),
|
||||
@ClusterConfigProperty(key = "log.segment.delete.delay.ms", value = "1000")
|
||||
}
|
||||
)
|
||||
public void testDeleteWhenTopicDoesntExistWithIfExists(ClusterInstance clusterInstance) throws Exception {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
try (Admin adminClient = clusterInstance.admin();
|
||||
|
@ -994,7 +1043,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest
|
||||
public void testDescribeWhenTopicDoesntExist(ClusterInstance clusterInstance) {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
try (Admin adminClient = clusterInstance.admin()) {
|
||||
|
@ -1007,7 +1056,7 @@ public class TopicCommandTest {
|
|||
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest
|
||||
public void testDescribeWhenTopicDoesntExistWithIfExists(ClusterInstance clusterInstance) throws Exception {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
try (Admin adminClient = clusterInstance.admin()) {
|
||||
|
@ -1015,17 +1064,16 @@ public class TopicCommandTest {
|
|||
|
||||
topicService.describeTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic", testTopicName, "--if-exists"));
|
||||
|
||||
adminClient.close();
|
||||
topicService.close();
|
||||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
public void testDescribeUnavailablePartitions(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
|
||||
@ClusterTest(brokers = 3)
|
||||
public void testDescribeUnavailablePartitions(ClusterInstance clusterInstance) throws InterruptedException {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
|
||||
try (Admin adminClient = clusterInstance.admin()) {
|
||||
int partitions = 6;
|
||||
int partitions = 3;
|
||||
short replicationFactor = 1;
|
||||
|
||||
adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor)));
|
||||
|
@ -1033,10 +1081,10 @@ public class TopicCommandTest {
|
|||
|
||||
// check which partition is on broker 0 which we'll kill
|
||||
clusterInstance.shutdownBroker(0);
|
||||
assertEquals(5, clusterInstance.aliveBrokers().size());
|
||||
assertEquals(2, clusterInstance.aliveBrokers().size());
|
||||
|
||||
// wait until the topic metadata for the test topic is propagated to each alive broker
|
||||
clusterInstance.waitForTopic(testTopicName, 6);
|
||||
clusterInstance.waitForTopic(testTopicName, 3);
|
||||
|
||||
// grab the console output and assert
|
||||
String output = captureDescribeTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic", testTopicName, "--unavailable-partitions"));
|
||||
|
@ -1049,17 +1097,17 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(brokers = 3)
|
||||
public void testDescribeUnderReplicatedPartitions(ClusterInstance clusterInstance) throws InterruptedException {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
try (Admin adminClient = clusterInstance.admin()) {
|
||||
int partitions = 1;
|
||||
short replicationFactor = 6;
|
||||
short replicationFactor = 3;
|
||||
adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor)));
|
||||
clusterInstance.waitForTopic(testTopicName, partitions);
|
||||
|
||||
clusterInstance.shutdownBroker(0);
|
||||
Assertions.assertEquals(clusterInstance.aliveBrokers().size(), 5);
|
||||
Assertions.assertEquals(clusterInstance.aliveBrokers().size(), 2);
|
||||
|
||||
TestUtils.waitForCondition(
|
||||
() -> clusterInstance.aliveBrokers().values().stream().allMatch(
|
||||
|
@ -1077,23 +1125,23 @@ public class TopicCommandTest {
|
|||
}
|
||||
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(brokers = 3)
|
||||
public void testDescribeUnderMinIsrPartitions(ClusterInstance clusterInstance) throws InterruptedException {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
|
||||
try (Admin adminClient = clusterInstance.admin()) {
|
||||
Map<String, String> topicConfig = new HashMap<>();
|
||||
topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6");
|
||||
topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3");
|
||||
int partitions = 1;
|
||||
short replicationFactor = 6;
|
||||
short replicationFactor = 3;
|
||||
adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(topicConfig)));
|
||||
clusterInstance.waitForTopic(testTopicName, partitions);
|
||||
|
||||
clusterInstance.shutdownBroker(0);
|
||||
assertEquals(5, clusterInstance.aliveBrokers().size());
|
||||
assertEquals(2, clusterInstance.aliveBrokers().size());
|
||||
|
||||
TestUtils.waitForCondition(
|
||||
() -> clusterInstance.aliveBrokers().values().stream().allMatch(broker -> broker.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() == 5),
|
||||
() -> clusterInstance.aliveBrokers().values().stream().allMatch(broker -> broker.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() == 2),
|
||||
CLUSTER_WAIT_MS, String.format("Timeout waiting for partition metadata propagating to brokers for %s topic", testTopicName)
|
||||
);
|
||||
|
||||
|
@ -1275,7 +1323,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(brokers = 3)
|
||||
public void testDescribeReportOverriddenConfigs(ClusterInstance clusterInstance) throws InterruptedException {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
try (Admin adminClient = clusterInstance.admin()) {
|
||||
|
@ -1294,7 +1342,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest
|
||||
public void testDescribeAndListTopicsWithoutInternalTopics(ClusterInstance clusterInstance) throws InterruptedException {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
try (Admin adminClient = clusterInstance.admin()) {
|
||||
|
@ -1316,7 +1364,7 @@ public class TopicCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest
|
||||
public void testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(ClusterInstance clusterInstance) throws Exception {
|
||||
String testTopicName = TestUtils.randomString(10);
|
||||
Admin adminClient = clusterInstance.admin();
|
||||
|
@ -1340,14 +1388,14 @@ public class TopicCommandTest {
|
|||
adminClient.close();
|
||||
}
|
||||
|
||||
@ClusterTemplate("generate")
|
||||
@ClusterTest(brokers = 3)
|
||||
public void testCreateWithTopicNameCollision(ClusterInstance clusterInstance) throws Exception {
|
||||
try (Admin adminClient = clusterInstance.admin();
|
||||
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
|
||||
|
||||
String topic = "foo_bar";
|
||||
int partitions = 1;
|
||||
short replicationFactor = 6;
|
||||
short replicationFactor = 3;
|
||||
adminClient.createTopics(Collections.singletonList(new NewTopic(topic, partitions, replicationFactor)));
|
||||
clusterInstance.waitForTopic(topic, defaultNumPartitions);
|
||||
|
||||
|
|
Loading…
Reference in New Issue