KAFKA-3639; Configure default serdes upon construction

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Michael G. Noll <michael@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #1311 from guozhangwang/K3639
This commit is contained in:
Guozhang Wang 2016-05-05 00:23:34 +01:00 committed by Ismael Juma
parent da7095f368
commit c8c6ac3f6d
4 changed files with 107 additions and 78 deletions

View File

@ -13,12 +13,32 @@
package org.apache.kafka.common.serialization;
import java.io.Closeable;
import java.util.Map;
/**
* The interface for wrapping a serializer and deserializer for the given data type.
*
* @param <T>
* @param <T> Type to be serialized from and deserialized into.
*
* A class that implements this interface is expected to have a constructor with no parameter.
*/
public interface Serde<T> {
public interface Serde<T> extends Closeable {
/**
* Configure this class, which will configure the underlying serializer and deserializer.
*
* @param configs configs in key/value pairs
* @param isKey whether is for key or value
*/
void configure(Map<String, ?> configs, boolean isKey);
/**
* Close this serde class, which will close the underlying serializer and deserializer.
* This method has to be idempotent because it might be called multiple times.
*/
@Override
void close();
Serializer<T> serializer();

View File

@ -16,93 +16,84 @@ package org.apache.kafka.common.serialization;
import org.apache.kafka.common.utils.Bytes;
import java.nio.ByteBuffer;
import java.util.Map;
/**
* Factory for creating serializers / deserializers.
*/
public class Serdes {
static public final class LongSerde implements Serde<Long> {
@Override
public Serializer<Long> serializer() {
return new LongSerializer();
static private class WrapperSerde<T> implements Serde<T> {
final private Serializer<T> serializer;
final private Deserializer<T> deserializer;
public WrapperSerde(Serializer<T> serializer, Deserializer<T> deserializer) {
this.serializer = serializer;
this.deserializer = deserializer;
}
@Override
public Deserializer<Long> deserializer() {
return new LongDeserializer();
public void configure(Map<String, ?> configs, boolean isKey) {
serializer.configure(configs, isKey);
deserializer.configure(configs, isKey);
}
@Override
public void close() {
serializer.close();
deserializer.close();
}
@Override
public Serializer<T> serializer() {
return serializer;
}
@Override
public Deserializer<T> deserializer() {
return deserializer;
}
}
static public final class IntegerSerde implements Serde<Integer> {
@Override
public Serializer<Integer> serializer() {
return new IntegerSerializer();
}
@Override
public Deserializer<Integer> deserializer() {
return new IntegerDeserializer();
static public final class LongSerde extends WrapperSerde<Long> {
public LongSerde() {
super(new LongSerializer(), new LongDeserializer());
}
}
static public final class DoubleSerde implements Serde<Double> {
@Override
public Serializer<Double> serializer() {
return new DoubleSerializer();
}
@Override
public Deserializer<Double> deserializer() {
return new DoubleDeserializer();
static public final class IntegerSerde extends WrapperSerde<Integer> {
public IntegerSerde() {
super(new IntegerSerializer(), new IntegerDeserializer());
}
}
static public final class StringSerde implements Serde<String> {
@Override
public Serializer<String> serializer() {
return new StringSerializer();
}
@Override
public Deserializer<String> deserializer() {
return new StringDeserializer();
static public final class DoubleSerde extends WrapperSerde<Double> {
public DoubleSerde() {
super(new DoubleSerializer(), new DoubleDeserializer());
}
}
static public final class ByteBufferSerde implements Serde<ByteBuffer> {
@Override
public Serializer<ByteBuffer> serializer() {
return new ByteBufferSerializer();
}
@Override
public Deserializer<ByteBuffer> deserializer() {
return new ByteBufferDeserializer();
static public final class StringSerde extends WrapperSerde<String> {
public StringSerde() {
super(new StringSerializer(), new StringDeserializer());
}
}
static public final class BytesSerde implements Serde<Bytes> {
@Override
public Serializer<Bytes> serializer() {
return new BytesSerializer();
}
@Override
public Deserializer<Bytes> deserializer() {
return new BytesDeserializer();
static public final class ByteBufferSerde extends WrapperSerde<ByteBuffer> {
public ByteBufferSerde() {
super(new ByteBufferSerializer(), new ByteBufferDeserializer());
}
}
static public final class ByteArraySerde implements Serde<byte[]> {
@Override
public Serializer<byte[]> serializer() {
return new ByteArraySerializer();
static public final class BytesSerde extends WrapperSerde<Bytes> {
public BytesSerde() {
super(new BytesSerializer(), new BytesDeserializer());
}
}
@Override
public Deserializer<byte[]> deserializer() {
return new ByteArrayDeserializer();
static public final class ByteArraySerde extends WrapperSerde<byte[]> {
public ByteArraySerde() {
super(new ByteArraySerializer(), new ByteArrayDeserializer());
}
}
@ -154,17 +145,7 @@ public class Serdes {
throw new IllegalArgumentException("deserializer must not be null");
}
return new Serde<T>() {
@Override
public Serializer<T> serializer() {
return serializer;
}
@Override
public Deserializer<T> deserializer() {
return deserializer;
}
};
return new WrapperSerde<>(serializer, deserializer);
}
/*

View File

@ -300,11 +300,17 @@ public class StreamsConfig extends AbstractConfig {
}
public Serde keySerde() {
return getConfiguredInstance(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serde.class);
Serde<?> serde = getConfiguredInstance(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serde.class);
serde.configure(originals(), true);
return serde;
}
public Serde valueSerde() {
return getConfiguredInstance(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serde.class);
Serde<?> serde = getConfiguredInstance(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serde.class);
serde.configure(originals(), false);
return serde;
}
public static void main(String[] args) {

View File

@ -18,10 +18,12 @@
package org.apache.kafka.streams;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
@ -31,13 +33,15 @@ public class StreamsConfigTest {
private Properties props = new Properties();
private StreamsConfig streamsConfig;
private StreamThread streamThreadPlaceHolder;
@Before
public void setUp() {
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put("key.deserializer.encoding", "UTF8");
props.put("value.deserializer.encoding", "UTF-16");
streamsConfig = new StreamsConfig(props);
}
@ -49,8 +53,7 @@ public class StreamsConfigTest {
@Test
public void testGetConsumerConfigs() throws Exception {
Map<String, Object> returnedProps =
streamsConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-application", "client");
Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null, "example-application", "client");
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-application");
@ -62,4 +65,23 @@ public class StreamsConfigTest {
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer");
assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
}
@Test
public void defaultSerdeShouldBeConfigured() {
Map<String, Object> serializerConfigs = new HashMap<String, Object>();
serializerConfigs.put("key.serializer.encoding", "UTF8");
serializerConfigs.put("value.serializer.encoding", "UTF-16");
Serializer<String> serializer = Serdes.String().serializer();
String str = "my string for testing";
String topic = "my topic";
serializer.configure(serializerConfigs, true);
assertEquals("Should get the original string after serialization and deserialization with the configured encoding",
str, streamsConfig.keySerde().deserializer().deserialize(topic, serializer.serialize(topic, str)));
serializer.configure(serializerConfigs, false);
assertEquals("Should get the original string after serialization and deserialization with the configured encoding",
str, streamsConfig.valueSerde().deserializer().deserialize(topic, serializer.serialize(topic, str)));
}
}