KAFKA-19042 Move ConsumerTopicCreationTest to client-integration-tests module (#19283)

This patch moves `ConsumerTopicCreationTest` to the
`client-integration-tests` and rewrite it as Java.
The patch also streamlines the test flow. 
In the Scala version, there is a producer that produces messages, but
this is not the main purpose of the `ConsumerTopicCreationTest`.

Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2025-03-31 20:15:54 +08:00 committed by GitHub
parent c5f8ae0424
commit 20546930ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 125 additions and 108 deletions

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.ClusterTemplate;
import java.time.Duration;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
import static org.apache.kafka.common.test.api.Type.KRAFT;
import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ConsumerTopicCreationTest {
private static final String TOPIC = "topic";
private static final long POLL_TIMEOUT = 1000;
@ClusterTemplate("autoCreateTopicsConfigs")
void testAsyncConsumerTopicCreationIfConsumerAllowToCreateTopic(ClusterInstance cluster) throws Exception {
try (Consumer<byte[], byte[]> consumer = createConsumer(cluster, GroupProtocol.CONSUMER, true)) {
subscribeAndPoll(consumer);
assertTopicCreateBasedOnPermission(cluster);
}
}
@ClusterTemplate("autoCreateTopicsConfigs")
void testAsyncConsumerTopicCreationIfConsumerDisallowToCreateTopic(ClusterInstance cluster) throws Exception {
try (Consumer<byte[], byte[]> consumer = createConsumer(cluster, GroupProtocol.CONSUMER, false)) {
subscribeAndPoll(consumer);
assertTopicNotCreate(cluster);
}
}
@ClusterTemplate("autoCreateTopicsConfigs")
void testClassicConsumerTopicCreationIfConsumerAllowToCreateTopic(ClusterInstance cluster) throws Exception {
try (Consumer<byte[], byte[]> consumer = createConsumer(cluster, GroupProtocol.CLASSIC, true)) {
subscribeAndPoll(consumer);
assertTopicCreateBasedOnPermission(cluster);
}
}
@ClusterTemplate("autoCreateTopicsConfigs")
void testClassicConsumerTopicCreationIfConsumerDisallowToCreateTopic(ClusterInstance cluster) throws Exception {
try (Consumer<byte[], byte[]> consumer = createConsumer(cluster, GroupProtocol.CLASSIC, false)) {
subscribeAndPoll(consumer);
assertTopicNotCreate(cluster);
}
}
private void subscribeAndPoll(Consumer<byte[], byte[]> consumer) {
consumer.subscribe(List.of(TOPIC));
consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
}
private void assertTopicCreateBasedOnPermission(ClusterInstance cluster) throws Exception {
if (allowAutoCreateTopics(cluster))
assertTopicCreate(cluster);
else
assertTopicNotCreate(cluster);
}
private boolean allowAutoCreateTopics(ClusterInstance cluster) {
return cluster.config().serverProperties().get(AUTO_CREATE_TOPICS_ENABLE_CONFIG).equals("true");
}
private void assertTopicCreate(ClusterInstance cluster) throws Exception {
assertTrue(getAllTopics(cluster).contains(TOPIC));
}
private void assertTopicNotCreate(ClusterInstance cluster) throws Exception {
assertFalse(getAllTopics(cluster).contains(TOPIC),
"Both " + AUTO_CREATE_TOPICS_ENABLE_CONFIG + " and " + ALLOW_AUTO_CREATE_TOPICS_CONFIG + " need to be true to create topic automatically");
}
private List<String> getAllTopics(ClusterInstance cluster) throws Exception {
try (Admin admin = cluster.admin()) {
return admin.listTopics().listings().get().stream().map(TopicListing::name).toList();
}
}
private static List<ClusterConfig> autoCreateTopicsConfigs() {
return List.of(
ClusterConfig.defaultBuilder()
.setTypes(Set.of(KRAFT))
.setServerProperties(Map.of(AUTO_CREATE_TOPICS_ENABLE_CONFIG, "true"))
.build(),
ClusterConfig.defaultBuilder()
.setTypes(Set.of(KRAFT))
.setServerProperties(Map.of(AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false"))
.build()
);
}
private Consumer<byte[], byte[]> createConsumer(ClusterInstance cluster, GroupProtocol protocol, boolean allowConsumerAutoCreateTopics) {
Map<String, Object> consumerConfig = Map.of(
ALLOW_AUTO_CREATE_TOPICS_CONFIG, allowConsumerAutoCreateTopics,
GROUP_PROTOCOL_CONFIG, protocol.name().toLowerCase(Locale.ROOT)
);
return cluster.consumer(consumerConfig);
}
}

View File

@ -1,108 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.lang.{Boolean => JBoolean}
import java.time.Duration
import java.util
import java.util.{Collections, Locale}
import kafka.utils.{EmptyTestInfo, TestUtils}
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.{ConsumerConfig, GroupProtocol}
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.server.config.{ServerConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
/**
* Tests behavior of specifying auto topic creation configuration for the consumer and broker
*/
class ConsumerTopicCreationTest {
@ParameterizedTest(name = "{displayName}.groupProtocol={0}.brokerAutoTopicCreationEnable={1}.consumerAllowAutoCreateTopics={2}")
@MethodSource(Array("parameters"))
def testAutoTopicCreation(groupProtocol: String, brokerAutoTopicCreationEnable: JBoolean, consumerAllowAutoCreateTopics: JBoolean): Unit = {
val testCase = new ConsumerTopicCreationTest.TestCase(groupProtocol, brokerAutoTopicCreationEnable, consumerAllowAutoCreateTopics)
testCase.setUp(new EmptyTestInfo() {
override def getDisplayName = "quorum=kraft"
})
try testCase.test() finally testCase.tearDown()
}
}
object ConsumerTopicCreationTest {
private class TestCase(groupProtocol: String, brokerAutoTopicCreationEnable: JBoolean, consumerAllowAutoCreateTopics: JBoolean) extends IntegrationTestHarness {
private val topic_1 = "topic-1"
private val topic_2 = "topic-2"
private val producerClientId = "ConsumerTestProducer"
private val consumerClientId = "ConsumerTestConsumer"
// configure server properties
this.serverConfig.setProperty(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, "false") // speed up shutdown
this.serverConfig.setProperty(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, brokerAutoTopicCreationEnable.toString)
// configure client properties
this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
this.consumerConfig.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, consumerAllowAutoCreateTopics.toString)
this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
override protected def brokerCount: Int = 1
def test(): Unit = {
val consumer = createConsumer()
val producer = createProducer()
val adminClient = createAdminClient()
val record = new ProducerRecord(topic_1, 0, "key".getBytes, "value".getBytes)
// create `topic_1` and produce a record to it
adminClient.createTopics(Collections.singleton(new NewTopic(topic_1, 1, 1.toShort))).all.get
producer.send(record).get
consumer.subscribe(util.Arrays.asList(topic_1, topic_2))
// Wait until the produced record was consumed. This guarantees that metadata request for `topic_2` was sent to the
// broker.
TestUtils.waitUntilTrue(() => {
consumer.poll(Duration.ofMillis(100)).count > 0
}, "Timed out waiting to consume")
// MetadataRequest is guaranteed to create the topic znode if creation was required
val topicCreated = getTopicIds().keySet.contains(topic_2)
if (brokerAutoTopicCreationEnable && consumerAllowAutoCreateTopics)
assertTrue(topicCreated)
else
assertFalse(topicCreated)
}
}
def parameters: java.util.stream.Stream[Arguments] = {
val data = new java.util.ArrayList[Arguments]()
for (brokerAutoTopicCreationEnable <- Array(JBoolean.TRUE, JBoolean.FALSE))
for (consumerAutoCreateTopicsPolicy <- Array(JBoolean.TRUE, JBoolean.FALSE))
data.add(Arguments.of(GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT), brokerAutoTopicCreationEnable, consumerAutoCreateTopicsPolicy))
data.stream()
}
}