MINOR: Align the constructor of KafkaConsumer to KafkaProducer (#8605)

1. Move KafkaProducer#propsToMap to Utils#propsToMap
2. Apply Utils#propsToMap to constructor of KafkaConsumer

Reviewers: Noa Resare <noa@resare.com>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Chia-Ping Tsai 2020-06-01 05:36:27 +08:00 committed by GitHub
parent 40b0033eed
commit d4c1ef4a10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 155 additions and 40 deletions

View File

@ -613,27 +613,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this(configs, null, null);
}
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a value {@link Deserializer}.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}.
* <p>
* Note: after creating a {@code KafkaConsumer} you must always {@link #close()} it to avoid resource leaks.
*
* @param configs The consumer configs
* @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
* @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
*/
public KafkaConsumer(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
keyDeserializer,
valueDeserializer);
}
/**
* A consumer is instantiated by providing a {@link java.util.Properties} object as configuration.
* <p>
@ -664,12 +643,27 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public KafkaConsumer(Properties properties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
keyDeserializer, valueDeserializer);
this(Utils.propsToMap(properties), keyDeserializer, valueDeserializer);
}
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a value {@link Deserializer}.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}.
* <p>
* Note: after creating a {@code KafkaConsumer} you must always {@link #close()} it to avoid resource leaks.
*
* @param configs The consumer configs
* @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
* @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
*/
@SuppressWarnings("unchecked")
private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
public KafkaConsumer(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
ConsumerConfig config = new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer));
try {
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
GroupRebalanceConfig.ProtocolType.CONSUMER);

View File

@ -78,7 +78,6 @@ import org.slf4j.Logger;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -298,7 +297,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* @param properties The producer configs
*/
public KafkaProducer(Properties properties) {
this(propsToMap(properties), null, null, null, null, null, Time.SYSTEM);
this(Utils.propsToMap(properties), null, null, null, null, null, Time.SYSTEM);
}
/**
@ -313,7 +312,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* be called in the producer when the serializer is passed in directly.
*/
public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(propsToMap(properties), keySerializer, valueSerializer, null, null, null,
this(Utils.propsToMap(properties), keySerializer, valueSerializer, null, null, null,
Time.SYSTEM);
}
@ -1239,19 +1238,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
log.debug("Kafka producer has been closed");
}
private static Map<String, Object> propsToMap(Properties properties) {
Map<String, Object> map = new HashMap<>(properties.size());
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
if (entry.getKey() instanceof String) {
String k = (String) entry.getKey();
map.put(k, properties.get(k));
} else {
throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string.");
}
}
return map;
}
private ClusterResourceListeners configureClusterResourceListeners(Serializer<K> keySerializer, Serializer<V> valueSerializer, List<?>... candidateLists) {
ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners();
for (List<?> candidateList: candidateLists)

View File

@ -20,6 +20,7 @@ import java.util.EnumSet;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -1210,4 +1211,22 @@ public final class Utils {
result.removeAll(right);
return result;
}
/**
* Convert a properties to map. All keys in properties must be string type. Otherwise, a ConfigException is thrown.
* @param properties to be converted
* @return a map including all elements in properties
*/
public static Map<String, Object> propsToMap(Properties properties) {
Map<String, Object> map = new HashMap<>(properties.size());
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
if (entry.getKey() instanceof String) {
String k = (String) entry.getKey();
map.put(k, properties.get(k));
} else {
throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string.");
}
}
return map;
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
public class ProducerConfigTest {
private final Serializer<byte[]> keySerializer = new ByteArraySerializer();
private final Serializer<String> valueSerializer = new StringSerializer();
private final String keySerializerClassName = keySerializer.getClass().getName();
private final String valueSerializerClassName = valueSerializer.getClass().getName();
private final Object keySerializerClass = keySerializer.getClass();
private final Object valueSerializerClass = valueSerializer.getClass();
@Test
public void testSerializerToPropertyConfig() {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClassName);
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClassName);
Properties newProperties = ProducerConfig.addSerializerToConfig(properties, null, null);
assertEquals(newProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), keySerializerClassName);
assertEquals(newProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClassName);
properties.clear();
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClassName);
newProperties = ProducerConfig.addSerializerToConfig(properties, keySerializer, null);
assertEquals(newProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), keySerializerClassName);
assertEquals(newProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClassName);
properties.clear();
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClassName);
newProperties = ProducerConfig.addSerializerToConfig(properties, null, valueSerializer);
assertEquals(newProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), keySerializerClassName);
assertEquals(newProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClassName);
properties.clear();
newProperties = ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer);
assertEquals(newProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), keySerializerClassName);
assertEquals(newProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClassName);
}
@Test
public void testSerializerToMapConfig() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
Map<String, Object> newConfigs = ProducerConfig.addSerializerToConfig(configs, null, null);
assertEquals(newConfigs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), keySerializerClass);
assertEquals(newConfigs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClass);
configs.clear();
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
newConfigs = ProducerConfig.addSerializerToConfig(configs, keySerializer, null);
assertEquals(newConfigs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), keySerializerClass);
assertEquals(newConfigs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClass);
configs.clear();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
newConfigs = ProducerConfig.addSerializerToConfig(configs, null, valueSerializer);
assertEquals(newConfigs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), keySerializerClass);
assertEquals(newConfigs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClass);
configs.clear();
newConfigs = ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer);
assertEquals(newConfigs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), keySerializerClass);
assertEquals(newConfigs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClass);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.utils;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import org.mockito.stubbing.OngoingStubbing;
@ -59,6 +60,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
@ -681,4 +683,26 @@ public class UtilsTest {
assertThat(diff, is(mkSet("a", "b")));
assertThat(diff.getClass(), equalTo(TreeSet.class));
}
@Test
public void testPropsToMap() {
assertThrows(ConfigException.class, () -> {
Properties props = new Properties();
props.put(1, 2);
Utils.propsToMap(props);
});
assertValue(false);
assertValue(1);
assertValue("string");
assertValue(1.1);
assertValue(Collections.emptySet());
assertValue(Collections.emptyList());
assertValue(Collections.emptyMap());
}
private static void assertValue(Object value) {
Properties props = new Properties();
props.put("key", value);
assertEquals(Utils.propsToMap(props).get("key"), value);
}
}