MINOR: Refactor ShareConsumerTest to use ClusterTestExtensions. (#18656)

Reviewers: ShivsundarR <shr@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-01-23 22:05:33 +05:30 committed by GitHub
parent aea699bdef
commit 01afba8fdb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 332 additions and 258 deletions

File diff suppressed because it is too large Load Diff

View File

@ -32,6 +32,8 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
@ -188,6 +190,20 @@ public interface ClusterInstance {
return consumer(Map.of()); return consumer(Map.of());
} }
default <K, V> ShareConsumer<K, V> shareConsumer() {
return shareConsumer(Map.of());
}
default <K, V> ShareConsumer<K, V> shareConsumer(Map<String, Object> configs) {
Map<String, Object> props = new HashMap<>(configs);
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + TestUtils.randomString(5));
props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
return new KafkaShareConsumer<>(setClientSaslConfig(props));
}
default Admin admin(Map<String, Object> configs, boolean usingBootstrapControllers) { default Admin admin(Map<String, Object> configs, boolean usingBootstrapControllers) {
Map<String, Object> props = new HashMap<>(configs); Map<String, Object> props = new HashMap<>(configs);
if (usingBootstrapControllers) { if (usingBootstrapControllers) {