mirror of https://github.com/apache/kafka.git
KAFKA-18075 Prevent ClusterInstance default producer and consumer initialization with empty configs (#17926)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
87b902d35d
commit
5ad532f4ad
|
@ -166,7 +166,7 @@ public interface ClusterInstance {
|
||||||
}
|
}
|
||||||
|
|
||||||
default <K, V> Producer<K, V> producer() {
|
default <K, V> Producer<K, V> producer() {
|
||||||
return new KafkaProducer<>(Map.of());
|
return producer(Map.of());
|
||||||
}
|
}
|
||||||
|
|
||||||
default <K, V> Consumer<K, V> consumer(Map<String, Object> configs) {
|
default <K, V> Consumer<K, V> consumer(Map<String, Object> configs) {
|
||||||
|
@ -180,7 +180,7 @@ public interface ClusterInstance {
|
||||||
}
|
}
|
||||||
|
|
||||||
default <K, V> Consumer<K, V> consumer() {
|
default <K, V> Consumer<K, V> consumer() {
|
||||||
return new KafkaConsumer<>(Map.of());
|
return consumer(Map.of());
|
||||||
}
|
}
|
||||||
|
|
||||||
default Admin admin(Map<String, Object> configs, boolean usingBootstrapControllers) {
|
default Admin admin(Map<String, Object> configs, boolean usingBootstrapControllers) {
|
||||||
|
|
|
@ -32,6 +32,8 @@ import org.apache.kafka.common.config.ConfigResource;
|
||||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
import org.apache.kafka.common.serialization.StringSerializer;
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
import org.apache.kafka.common.test.TestUtils;
|
import org.apache.kafka.common.test.TestUtils;
|
||||||
|
import org.apache.kafka.common.utils.Bytes;
|
||||||
|
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
|
||||||
import org.apache.kafka.server.common.MetadataVersion;
|
import org.apache.kafka.server.common.MetadataVersion;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
@ -292,4 +294,32 @@ public class ClusterTestExtensionsTest {
|
||||||
assertEquals(value, records.get(0).value());
|
assertEquals(value, records.get(0).value());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, serverProperties = {
|
||||||
|
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
|
||||||
|
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
|
||||||
|
})
|
||||||
|
public void testCreateDefaultProducerAndConsumer(ClusterInstance cluster) throws InterruptedException {
|
||||||
|
String topic = "topic";
|
||||||
|
Bytes key = Bytes.wrap("key".getBytes());
|
||||||
|
Bytes value = Bytes.wrap("value".getBytes());
|
||||||
|
try (Admin adminClient = cluster.admin();
|
||||||
|
Producer<Bytes, Bytes> producer = cluster.producer();
|
||||||
|
Consumer<Bytes, Bytes> consumer = cluster.consumer()
|
||||||
|
) {
|
||||||
|
adminClient.createTopics(singleton(new NewTopic(topic, 1, (short) 1)));
|
||||||
|
assertNotNull(producer);
|
||||||
|
assertNotNull(consumer);
|
||||||
|
producer.send(new ProducerRecord<>(topic, key, value));
|
||||||
|
producer.flush();
|
||||||
|
consumer.subscribe(singletonList(topic));
|
||||||
|
List<ConsumerRecord<Bytes, Bytes>> records = new ArrayList<>();
|
||||||
|
TestUtils.waitForCondition(() -> {
|
||||||
|
consumer.poll(Duration.ofMillis(100)).forEach(records::add);
|
||||||
|
return records.size() == 1;
|
||||||
|
}, "Failed to receive message");
|
||||||
|
assertEquals(key, records.get(0).key());
|
||||||
|
assertEquals(value, records.get(0).value());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue