KAFKA-6161 Add default implementation to close() and configure() for Serdes (#5348)

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Chia-Ping Tsai 2019-02-22 01:05:13 +08:00 committed by Guozhang Wang
parent 8de3092b05
commit 35a0de32ee
37 changed files with 33 additions and 307 deletions

View File

@ -16,22 +16,10 @@
*/
package org.apache.kafka.common.serialization;
import java.util.Map;
public class ByteArrayDeserializer implements Deserializer<byte[]> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
@Override
public byte[] deserialize(String topic, byte[] data) {
return data;
}
@Override
public void close() {
// nothing to do
}
}

View File

@ -16,22 +16,9 @@
*/
package org.apache.kafka.common.serialization;
import java.util.Map;
public class ByteArraySerializer implements Serializer<byte[]> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
@Override
public byte[] serialize(String topic, byte[] data) {
return data;
}
@Override
public void close() {
// nothing to do
}
}

View File

@ -17,22 +17,12 @@
package org.apache.kafka.common.serialization;
import java.nio.ByteBuffer;
import java.util.Map;
public class ByteBufferDeserializer implements Deserializer<ByteBuffer> {
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
public ByteBuffer deserialize(String topic, byte[] data) {
if (data == null)
return null;
return ByteBuffer.wrap(data);
}
public void close() {
// nothing to do
}
}

View File

@ -17,14 +17,8 @@
package org.apache.kafka.common.serialization;
import java.nio.ByteBuffer;
import java.util.Map;
public class ByteBufferSerializer implements Serializer<ByteBuffer> {
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
public byte[] serialize(String topic, ByteBuffer data) {
if (data == null)
return null;
@ -43,8 +37,4 @@ public class ByteBufferSerializer implements Serializer<ByteBuffer> {
data.rewind();
return ret;
}
public void close() {
// nothing to do
}
}

View File

@ -18,22 +18,11 @@ package org.apache.kafka.common.serialization;
import org.apache.kafka.common.utils.Bytes;
import java.util.Map;
public class BytesDeserializer implements Deserializer<Bytes> {
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
public Bytes deserialize(String topic, byte[] data) {
if (data == null)
return null;
return new Bytes(data);
}
public void close() {
// nothing to do
}
}

View File

@ -18,23 +18,12 @@ package org.apache.kafka.common.serialization;
import org.apache.kafka.common.utils.Bytes;
import java.util.Map;
public class BytesSerializer implements Serializer<Bytes> {
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
public byte[] serialize(String topic, Bytes data) {
if (data == null)
return null;
return data.get();
}
public void close() {
// nothing to do
}
}

View File

@ -37,7 +37,9 @@ public interface Deserializer<T> extends Closeable {
* @param configs configs in key/value pairs
* @param isKey whether is for key or value
*/
void configure(Map<String, ?> configs, boolean isKey);
default void configure(Map<String, ?> configs, boolean isKey) {
// intentionally left blank
}
/**
* Deserialize a record value from a byte array into a value or object.
@ -58,6 +60,13 @@ public interface Deserializer<T> extends Closeable {
return deserialize(topic, data);
}
/**
* Close this deserializer.
* <p>
* This method must be idempotent as it may be called multiple times.
*/
@Override
void close();
default void close() {
// intentionally left blank
}
}

View File

@ -18,15 +18,8 @@ package org.apache.kafka.common.serialization;
import org.apache.kafka.common.errors.SerializationException;
import java.util.Map;
public class DoubleDeserializer implements Deserializer<Double> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
@Override
public Double deserialize(String topic, byte[] data) {
if (data == null)
@ -42,9 +35,4 @@ public class DoubleDeserializer implements Deserializer<Double> {
}
return Double.longBitsToDouble(value);
}
@Override
public void close() {
// nothing to do
}
}

View File

@ -16,15 +16,7 @@
*/
package org.apache.kafka.common.serialization;
import java.util.Map;
public class DoubleSerializer implements Serializer<Double> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
@Override
public byte[] serialize(String topic, Double data) {
if (data == null)
@ -42,9 +34,4 @@ public class DoubleSerializer implements Serializer<Double> {
(byte) bits
};
}
@Override
public void close() {
// nothing to do
}
}

View File

@ -18,15 +18,7 @@ package org.apache.kafka.common.serialization;
import org.apache.kafka.common.errors.SerializationException;
import java.util.Map;
public class FloatDeserializer implements Deserializer<Float> {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
// nothing to do
}
@Override
public Float deserialize(final String topic, final byte[] data) {
if (data == null)
@ -42,10 +34,4 @@ public class FloatDeserializer implements Deserializer<Float> {
}
return Float.intBitsToFloat(value);
}
@Override
public void close() {
// nothing to do
}
}

View File

@ -16,15 +16,7 @@
*/
package org.apache.kafka.common.serialization;
import java.util.Map;
public class FloatSerializer implements Serializer<Float> {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
// nothing to do
}
@Override
public byte[] serialize(final String topic, final Float data) {
if (data == null)
@ -38,9 +30,4 @@ public class FloatSerializer implements Serializer<Float> {
(byte) bits
};
}
@Override
public void close() {
// nothing to do
}
}

View File

@ -18,14 +18,7 @@ package org.apache.kafka.common.serialization;
import org.apache.kafka.common.errors.SerializationException;
import java.util.Map;
public class IntegerDeserializer implements Deserializer<Integer> {
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
public Integer deserialize(String topic, byte[] data) {
if (data == null)
return null;
@ -40,8 +33,4 @@ public class IntegerDeserializer implements Deserializer<Integer> {
}
return value;
}
public void close() {
// nothing to do
}
}

View File

@ -16,14 +16,7 @@
*/
package org.apache.kafka.common.serialization;
import java.util.Map;
public class IntegerSerializer implements Serializer<Integer> {
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
public byte[] serialize(String topic, Integer data) {
if (data == null)
return null;
@ -35,8 +28,4 @@ public class IntegerSerializer implements Serializer<Integer> {
data.byteValue()
};
}
public void close() {
// nothing to do
}
}

View File

@ -18,14 +18,7 @@ package org.apache.kafka.common.serialization;
import org.apache.kafka.common.errors.SerializationException;
import java.util.Map;
public class LongDeserializer implements Deserializer<Long> {
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
public Long deserialize(String topic, byte[] data) {
if (data == null)
return null;
@ -40,8 +33,4 @@ public class LongDeserializer implements Deserializer<Long> {
}
return value;
}
public void close() {
// nothing to do
}
}

View File

@ -16,14 +16,7 @@
*/
package org.apache.kafka.common.serialization;
import java.util.Map;
public class LongSerializer implements Serializer<Long> {
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
public byte[] serialize(String topic, Long data) {
if (data == null)
return null;
@ -39,8 +32,4 @@ public class LongSerializer implements Serializer<Long> {
data.byteValue()
};
}
public void close() {
// nothing to do
}
}

View File

@ -34,14 +34,19 @@ public interface Serde<T> extends Closeable {
* @param configs configs in key/value pairs
* @param isKey whether is for key or value
*/
void configure(Map<String, ?> configs, boolean isKey);
default void configure(Map<String, ?> configs, boolean isKey) {
// intentionally left blank
}
/**
* Close this serde class, which will close the underlying serializer and deserializer.
* <p>
* This method has to be idempotent because it might be called multiple times.
*/
@Override
void close();
default void close() {
// intentionally left blank
}
Serializer<T> serializer();

View File

@ -37,7 +37,9 @@ public interface Serializer<T> extends Closeable {
* @param configs configs in key/value pairs
* @param isKey whether is for key or value
*/
void configure(Map<String, ?> configs, boolean isKey);
default void configure(Map<String, ?> configs, boolean isKey) {
// intentionally left blank
}
/**
* Convert {@code data} into a byte array.
@ -62,9 +64,11 @@ public interface Serializer<T> extends Closeable {
/**
* Close this serializer.
*
* <p>
* This method must be idempotent as it may be called multiple times.
*/
@Override
void close();
default void close() {
// intentionally left blank
}
}

View File

@ -18,14 +18,8 @@ package org.apache.kafka.common.serialization;
import org.apache.kafka.common.errors.SerializationException;
import java.util.Map;
public class ShortDeserializer implements Deserializer<Short> {
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
public Short deserialize(String topic, byte[] data) {
if (data == null)
return null;
@ -40,8 +34,4 @@ public class ShortDeserializer implements Deserializer<Short> {
}
return value;
}
public void close() {
// nothing to do
}
}

View File

@ -16,14 +16,7 @@
*/
package org.apache.kafka.common.serialization;
import java.util.Map;
public class ShortSerializer implements Serializer<Short> {
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
public byte[] serialize(String topic, Short data) {
if (data == null)
return null;
@ -33,8 +26,4 @@ public class ShortSerializer implements Serializer<Short> {
data.byteValue()
};
}
public void close() {
// nothing to do
}
}

View File

@ -49,9 +49,4 @@ public class StringDeserializer implements Deserializer<String> {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
@Override
public void close() {
// nothing to do
}
}

View File

@ -49,9 +49,4 @@ public class StringSerializer implements Serializer<String> {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
@Override
public void close() {
// nothing to do
}
}

View File

@ -52,9 +52,4 @@ public class UUIDDeserializer implements Deserializer<UUID> {
throw new SerializationException("Error parsing data into UUID", e);
}
}
@Override
public void close() {
// do nothing
}
}

View File

@ -50,9 +50,4 @@ public class UUIDSerializer implements Serializer<UUID> {
throw new SerializationException("Error when serializing UUID to byte[] due to unsupported encoding " + encoding);
}
}
@Override
public void close() {
// nothing to do
}
}

View File

@ -20,7 +20,6 @@ import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -35,10 +34,6 @@ public class MockSerializer implements ClusterResourceListener, Serializer<byte[
INIT_COUNT.incrementAndGet();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, byte[] data) {
// This will ensure that we get the cluster metadata when serialize is called for the first time

View File

@ -21,8 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
/**
* JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily
* structured data without having associated Java classes. This deserializer also supports Connect schemas.
@ -36,9 +34,6 @@ public class JsonDeserializer implements Deserializer<JsonNode> {
public JsonDeserializer() {
}
@Override
public void configure(Map<String, ?> props, boolean isKey) {
}
@Override
public JsonNode deserialize(String topic, byte[] bytes) {
@ -54,9 +49,4 @@ public class JsonDeserializer implements Deserializer<JsonNode> {
return data;
}
@Override
public void close() {
}
}

View File

@ -21,8 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
/**
* Serialize Jackson JsonNode tree model objects to UTF-8 JSON. Using the tree model allows handling arbitrarily
* structured data without corresponding Java classes. This serializer also supports Connect schemas.
@ -37,10 +35,6 @@ public class JsonSerializer implements Serializer<JsonNode> {
}
@Override
public void configure(Map<String, ?> config, boolean isKey) {
}
@Override
public byte[] serialize(String topic, JsonNode data) {
if (data == null)
@ -52,9 +46,4 @@ public class JsonSerializer implements Serializer<JsonNode> {
throw new SerializationException("Error serializing JSON message", e);
}
}
@Override
public void close() {
}
}

View File

@ -27,16 +27,11 @@ import org.junit.Test
import org.scalatest.mockito.MockitoSugar
class CustomDeserializer extends Deserializer[String] {
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {
}
override def deserialize(topic: String, data: Array[Byte]): String = {
assertThat("topic must not be null", topic, CoreMatchers.notNullValue)
new String(data)
}
override def close(): Unit = {
}
}
class CustomDeserializerTest extends MockitoSugar {

View File

@ -73,6 +73,11 @@
As of 2.3.0 Streams now offers an in-memory version of the window store, in addition to the persistent one based on RocksDB. The new public interface <code>inMemoryWindowStore()</code> is added to Stores that provides a built-in in-memory window store.
</p>
<p>
In 2.3.0 we have added default implementation to close() and configure() for <code>Serializer</code>, <code>Deserializer</code> and <code>Serde</code> so that they can be
implemented by lambda expression. For more details please read <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-331+Add+default+implementation+to+close%28%29+and+configure%28%29+for+Serializer%2C+Deserializer+and+Serde">KIP-331</a>.
</p>
<h3><a id="streams_api_changes_220" href="#streams_api_changes_220">Streams API changes in 2.2.0</a></h3>
<p>
We've simplified the <code>KafkaStreams#state</code> transition diagram during the starting up phase a bit in 2.2.0: in older versions the state will transit from <code>CREATED</code> to <code>RUNNING</code>, and then to <code>REBALANCING</code> to get the first

View File

@ -304,6 +304,7 @@ public class Topology {
* @return itself
* @throws TopologyException if processor is already added or if topics have already been registered by name
*/
@SuppressWarnings("overloads")
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final Deserializer keyDeserializer,
@ -359,7 +360,7 @@ public class Topology {
* @return itself
* @throws TopologyException if processor is already added or if topics have already been registered by another source
*/
@SuppressWarnings("overloads")
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
@ -391,6 +392,7 @@ public class Topology {
* @return itself
* @throws TopologyException if processor is already added or if topics have already been registered by name
*/
@SuppressWarnings("overloads")
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,

View File

@ -20,7 +20,6 @@ import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import java.nio.ByteBuffer;
import java.util.Map;
public class ChangedDeserializer<T> implements Deserializer<Change<T>> {
@ -40,11 +39,6 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>> {
this.inner = inner;
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
// do nothing
}
@Override
public Change<T> deserialize(final String topic, final Headers headers, final byte[] data) {

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.StreamsException;
import java.nio.ByteBuffer;
import java.util.Map;
public class ChangedSerializer<T> implements Serializer<Change<T>> {
@ -41,11 +40,6 @@ public class ChangedSerializer<T> implements Serializer<Change<T>> {
this.inner = inner;
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
// do nothing
}
/**
* @throws StreamsException if both old and new values of data are null, or if
* both values are not null

View File

@ -645,9 +645,6 @@ public class StreamsConfigTest {
throw new RuntimeException("boom");
}
@Override
public void close() {}
@Override
public Serializer serializer() {
return null;

View File

@ -188,9 +188,6 @@ public class YahooBenchmark {
@SuppressWarnings("WeakerAccess")
public JsonPOJOSerializer() {}
@Override
public void configure(final Map<String, ?> props, final boolean isKey) {}
@Override
public byte[] serialize(final String topic, final T data) {
if (data == null) {
@ -203,10 +200,6 @@ public class YahooBenchmark {
throw new SerializationException("Error serializing JSON message", e);
}
}
@Override
public void close() {}
}
// Note: these are also in the streams example package, eventuall use 1 file
@ -242,11 +235,6 @@ public class YahooBenchmark {
return data;
}
@Override
public void close() {
}
}
private KafkaStreams createYahooBenchmarkStreams(final Properties streamConfig, final String campaignsTopic, final String eventsTopic,

View File

@ -23,7 +23,6 @@ import org.apache.kafka.test.MockSourceNode;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@ -51,15 +50,9 @@ public class SourceNodeTest {
return topic + headers + new String(data, StandardCharsets.UTF_8);
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) { }
@Override
public String deserialize(final String topic, final byte[] data) {
return deserialize(topic, null, data);
}
@Override
public void close() { }
}
}

View File

@ -22,19 +22,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Map;
class SerdeThatDoesntHandleNull implements Serde<String> {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
}
@Override
public void close() {
}
@Override
public Serializer<String> serializer() {
return new StringSerializer();

View File

@ -473,10 +473,6 @@ public class TopologyTestDriverTest {
}
return Serdes.Integer().serializer().serialize(topic, (Integer) data);
}
@Override
public void close() {}
@Override
public void configure(final Map configs, final boolean isKey) {}
},
new Serializer<Object>() {
@Override
@ -486,10 +482,6 @@ public class TopologyTestDriverTest {
}
return Serdes.Double().serializer().serialize(topic, (Double) data);
}
@Override
public void close() {}
@Override
public void configure(final Map configs, final boolean isKey) {}
},
processor);

View File

@ -339,21 +339,11 @@ public class ClientCompatibilityTest {
this.expectClusterId = expectClusterId;
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
@Override
public byte[] deserialize(String topic, byte[] data) {
return data;
}
@Override
public void close() {
// nothing to do
}
@Override
public void onUpdate(ClusterResource clusterResource) {
if (expectClusterId) {