diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java index 267211576b6..1147f45534f 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java @@ -16,22 +16,10 @@ */ package org.apache.kafka.common.serialization; -import java.util.Map; - public class ByteArrayDeserializer implements Deserializer { - @Override - public void configure(Map configs, boolean isKey) { - // nothing to do - } - @Override public byte[] deserialize(String topic, byte[] data) { return data; } - - @Override - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java index d069e9495e6..6bebaa6531f 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java @@ -16,22 +16,9 @@ */ package org.apache.kafka.common.serialization; -import java.util.Map; - public class ByteArraySerializer implements Serializer { - - @Override - public void configure(Map configs, boolean isKey) { - // nothing to do - } - @Override public byte[] serialize(String topic, byte[] data) { return data; } - - @Override - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java index d41f03c6675..0dfcf5f26c3 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java @@ -17,22 +17,12 @@ package org.apache.kafka.common.serialization; import java.nio.ByteBuffer; -import java.util.Map; public class ByteBufferDeserializer implements Deserializer { - - public void configure(Map configs, boolean isKey) { - // nothing to do - } - public ByteBuffer deserialize(String topic, byte[] data) { if (data == null) return null; return ByteBuffer.wrap(data); } - - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java index c8c369272dd..9fb12544e0f 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java @@ -17,14 +17,8 @@ package org.apache.kafka.common.serialization; import java.nio.ByteBuffer; -import java.util.Map; public class ByteBufferSerializer implements Serializer { - - public void configure(Map configs, boolean isKey) { - // nothing to do - } - public byte[] serialize(String topic, ByteBuffer data) { if (data == null) return null; @@ -43,8 +37,4 @@ public class ByteBufferSerializer implements Serializer { data.rewind(); return ret; } - - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java index 66b07eb5841..1350dca21dd 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java @@ -18,22 +18,11 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.utils.Bytes; -import java.util.Map; - public class BytesDeserializer implements Deserializer { - - public void configure(Map configs, boolean isKey) { - // nothing to do - } - public Bytes deserialize(String topic, byte[] data) { if (data == null) return null; return new Bytes(data); } - - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java index 0dc4476d46d..62ea6ec321f 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java @@ -18,23 +18,12 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.utils.Bytes; -import java.util.Map; - public class BytesSerializer implements Serializer { - - public void configure(Map configs, boolean isKey) { - // nothing to do - } - public byte[] serialize(String topic, Bytes data) { if (data == null) return null; return data.get(); } - - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java index bc1a714c35e..eb56485abce 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java @@ -37,7 +37,9 @@ public interface Deserializer extends Closeable { * @param configs configs in key/value pairs * @param isKey whether is for key or value */ - void configure(Map configs, boolean isKey); + default void configure(Map configs, boolean isKey) { + // intentionally left blank + } /** * Deserialize a record value from a byte array into a value or object. @@ -58,6 +60,13 @@ public interface Deserializer extends Closeable { return deserialize(topic, data); } + /** + * Close this deserializer. + *

+ * This method must be idempotent as it may be called multiple times. + */ @Override - void close(); + default void close() { + // intentionally left blank + } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java index 24f6007cb35..0fa1cce4d74 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java @@ -18,15 +18,8 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; -import java.util.Map; - public class DoubleDeserializer implements Deserializer { - @Override - public void configure(Map configs, boolean isKey) { - // nothing to do - } - @Override public Double deserialize(String topic, byte[] data) { if (data == null) @@ -42,9 +35,4 @@ public class DoubleDeserializer implements Deserializer { } return Double.longBitsToDouble(value); } - - @Override - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java index 7dd4edc3b62..99781b53d0e 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java @@ -16,15 +16,7 @@ */ package org.apache.kafka.common.serialization; -import java.util.Map; - public class DoubleSerializer implements Serializer { - - @Override - public void configure(Map configs, boolean isKey) { - // nothing to do - } - @Override public byte[] serialize(String topic, Double data) { if (data == null) @@ -42,9 +34,4 @@ public class DoubleSerializer implements Serializer { (byte) bits }; } - - @Override - public void close() { - // nothing to do - } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java index 3834ce20b07..09031779426 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java @@ -18,15 +18,7 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; -import java.util.Map; - public class FloatDeserializer implements Deserializer { - - @Override - public void configure(final Map configs, final boolean isKey) { - // nothing to do - } - @Override public Float deserialize(final String topic, final byte[] data) { if (data == null) @@ -42,10 +34,4 @@ public class FloatDeserializer implements Deserializer { } return Float.intBitsToFloat(value); } - - @Override - public void close() { - // nothing to do - } - } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java index 6eb766dcd42..aa72d43a91e 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java @@ -16,15 +16,7 @@ */ package org.apache.kafka.common.serialization; -import java.util.Map; - public class FloatSerializer implements Serializer { - - @Override - public void configure(final Map configs, final boolean isKey) { - // nothing to do - } - @Override public byte[] serialize(final String topic, final Float data) { if (data == null) @@ -38,9 +30,4 @@ public class FloatSerializer implements Serializer { (byte) bits }; } - - @Override - public void close() { - // nothing to do - } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java index 45f8cf18fd4..20ca63f0224 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java @@ -18,14 +18,7 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; -import java.util.Map; - public class IntegerDeserializer implements Deserializer { - - public void configure(Map configs, boolean isKey) { - // nothing to do - } - public Integer deserialize(String topic, byte[] data) { if (data == null) return null; @@ -40,8 +33,4 @@ public class IntegerDeserializer implements Deserializer { } return value; } - - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java index f2144ceee70..8ab53104600 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java @@ -16,14 +16,7 @@ */ package org.apache.kafka.common.serialization; -import java.util.Map; - public class IntegerSerializer implements Serializer { - - public void configure(Map configs, boolean isKey) { - // nothing to do - } - public byte[] serialize(String topic, Integer data) { if (data == null) return null; @@ -35,8 +28,4 @@ public class IntegerSerializer implements Serializer { data.byteValue() }; } - - public void close() { - // nothing to do - } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java index a58b1d38cf3..1e445d2452f 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java @@ -18,14 +18,7 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; -import java.util.Map; - public class LongDeserializer implements Deserializer { - - public void configure(Map configs, boolean isKey) { - // nothing to do - } - public Long deserialize(String topic, byte[] data) { if (data == null) return null; @@ -40,8 +33,4 @@ public class LongDeserializer implements Deserializer { } return value; } - - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java index d37842c3914..436f0e01095 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java @@ -16,14 +16,7 @@ */ package org.apache.kafka.common.serialization; -import java.util.Map; - public class LongSerializer implements Serializer { - - public void configure(Map configs, boolean isKey) { - // nothing to do - } - public byte[] serialize(String topic, Long data) { if (data == null) return null; @@ -39,8 +32,4 @@ public class LongSerializer implements Serializer { data.byteValue() }; } - - public void close() { - // nothing to do - } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java index fbcc7c2e0fc..5b052e69f69 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java @@ -34,14 +34,19 @@ public interface Serde extends Closeable { * @param configs configs in key/value pairs * @param isKey whether is for key or value */ - void configure(Map configs, boolean isKey); + default void configure(Map configs, boolean isKey) { + // intentionally left blank + } /** * Close this serde class, which will close the underlying serializer and deserializer. + *

* This method has to be idempotent because it might be called multiple times. */ @Override - void close(); + default void close() { + // intentionally left blank + } Serializer serializer(); diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java index c5d4760d381..144b5ab945e 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java @@ -37,7 +37,9 @@ public interface Serializer extends Closeable { * @param configs configs in key/value pairs * @param isKey whether is for key or value */ - void configure(Map configs, boolean isKey); + default void configure(Map configs, boolean isKey) { + // intentionally left blank + } /** * Convert {@code data} into a byte array. @@ -62,9 +64,11 @@ public interface Serializer extends Closeable { /** * Close this serializer. - * + *

* This method must be idempotent as it may be called multiple times. */ @Override - void close(); + default void close() { + // intentionally left blank + } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java index 45aa8ae7ae3..7814a7bd712 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java @@ -18,14 +18,8 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; -import java.util.Map; - public class ShortDeserializer implements Deserializer { - public void configure(Map configs, boolean isKey) { - // nothing to do - } - public Short deserialize(String topic, byte[] data) { if (data == null) return null; @@ -40,8 +34,4 @@ public class ShortDeserializer implements Deserializer { } return value; } - - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java index a66aaa09685..e54354b4dea 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java @@ -16,14 +16,7 @@ */ package org.apache.kafka.common.serialization; -import java.util.Map; - public class ShortSerializer implements Serializer { - - public void configure(Map configs, boolean isKey) { - // nothing to do - } - public byte[] serialize(String topic, Short data) { if (data == null) return null; @@ -33,8 +26,4 @@ public class ShortSerializer implements Serializer { data.byteValue() }; } - - public void close() { - // nothing to do - } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java index 0398a1b2cc1..68e6c409cba 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java @@ -49,9 +49,4 @@ public class StringDeserializer implements Deserializer { throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding); } } - - @Override - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java index 28e41741a37..e16e19ac75c 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java @@ -49,9 +49,4 @@ public class StringSerializer implements Serializer { throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding); } } - - @Override - public void close() { - // nothing to do - } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java index a6eb2ea4757..e852fc95afd 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java @@ -52,9 +52,4 @@ public class UUIDDeserializer implements Deserializer { throw new SerializationException("Error parsing data into UUID", e); } } - - @Override - public void close() { - // do nothing - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java index d8e2524ad9f..908c202c748 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java @@ -50,9 +50,4 @@ public class UUIDSerializer implements Serializer { throw new SerializationException("Error when serializing UUID to byte[] due to unsupported encoding " + encoding); } } - - @Override - public void close() { - // nothing to do - } } diff --git a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java index 0c597c8b72a..1c144455080 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java +++ b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.ClusterResourceListener; import org.apache.kafka.common.ClusterResource; import org.apache.kafka.common.serialization.Serializer; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -35,10 +34,6 @@ public class MockSerializer implements ClusterResourceListener, Serializer configs, boolean isKey) { - } - @Override public byte[] serialize(String topic, byte[] data) { // This will ensure that we get the cluster metadata when serialize is called for the first time diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java index 8f2171bc4bc..b006e227215 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java @@ -21,8 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; -import java.util.Map; - /** * JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily * structured data without having associated Java classes. This deserializer also supports Connect schemas. @@ -36,9 +34,6 @@ public class JsonDeserializer implements Deserializer { public JsonDeserializer() { } - @Override - public void configure(Map props, boolean isKey) { - } @Override public JsonNode deserialize(String topic, byte[] bytes) { @@ -54,9 +49,4 @@ public class JsonDeserializer implements Deserializer { return data; } - - @Override - public void close() { - - } } diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java index 438daa17e6b..94ec0a83e8c 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java @@ -21,8 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; -import java.util.Map; - /** * Serialize Jackson JsonNode tree model objects to UTF-8 JSON. Using the tree model allows handling arbitrarily * structured data without corresponding Java classes. This serializer also supports Connect schemas. @@ -37,10 +35,6 @@ public class JsonSerializer implements Serializer { } - @Override - public void configure(Map config, boolean isKey) { - } - @Override public byte[] serialize(String topic, JsonNode data) { if (data == null) @@ -52,9 +46,4 @@ public class JsonSerializer implements Serializer { throw new SerializationException("Error serializing JSON message", e); } } - - @Override - public void close() { - } - } diff --git a/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala index f94a9006c8c..7fb3cf30cc9 100644 --- a/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala +++ b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala @@ -27,16 +27,11 @@ import org.junit.Test import org.scalatest.mockito.MockitoSugar class CustomDeserializer extends Deserializer[String] { - override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = { - } override def deserialize(topic: String, data: Array[Byte]): String = { assertThat("topic must not be null", topic, CoreMatchers.notNullValue) new String(data) } - - override def close(): Unit = { - } } class CustomDeserializerTest extends MockitoSugar { diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index b4d957a79e7..9071dc2eefa 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -73,6 +73,11 @@ As of 2.3.0 Streams now offers an in-memory version of the window store, in addition to the persistent one based on RocksDB. The new public interface inMemoryWindowStore() is added to Stores that provides a built-in in-memory window store.

+

+ In 2.3.0 we have added default implementation to close() and configure() for Serializer, Deserializer and Serde so that they can be + implemented by lambda expression. For more details please read KIP-331. +

+

Streams API changes in 2.2.0

We've simplified the KafkaStreams#state transition diagram during the starting up phase a bit in 2.2.0: in older versions the state will transit from CREATED to RUNNING, and then to REBALANCING to get the first diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java index 8b2a46b15c0..d13e4a84868 100644 --- a/streams/src/main/java/org/apache/kafka/streams/Topology.java +++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java @@ -304,6 +304,7 @@ public class Topology { * @return itself * @throws TopologyException if processor is already added or if topics have already been registered by name */ + @SuppressWarnings("overloads") public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer keyDeserializer, @@ -359,7 +360,7 @@ public class Topology { * @return itself * @throws TopologyException if processor is already added or if topics have already been registered by another source */ - + @SuppressWarnings("overloads") public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, @@ -391,6 +392,7 @@ public class Topology { * @return itself * @throws TopologyException if processor is already added or if topics have already been registered by name */ + @SuppressWarnings("overloads") public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java index 56193d570ec..36f77b81b23 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Deserializer; import java.nio.ByteBuffer; -import java.util.Map; public class ChangedDeserializer implements Deserializer> { @@ -40,11 +39,6 @@ public class ChangedDeserializer implements Deserializer> { this.inner = inner; } - @Override - public void configure(final Map configs, final boolean isKey) { - // do nothing - } - @Override public Change deserialize(final String topic, final Headers headers, final byte[] data) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java index 7fa34b73d27..bfd0afa1b10 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.errors.StreamsException; import java.nio.ByteBuffer; -import java.util.Map; public class ChangedSerializer implements Serializer> { @@ -41,11 +40,6 @@ public class ChangedSerializer implements Serializer> { this.inner = inner; } - @Override - public void configure(final Map configs, final boolean isKey) { - // do nothing - } - /** * @throws StreamsException if both old and new values of data are null, or if * both values are not null diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index afebfdb0df4..2c9a97b4e49 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -645,9 +645,6 @@ public class StreamsConfigTest { throw new RuntimeException("boom"); } - @Override - public void close() {} - @Override public Serializer serializer() { return null; diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java index 86079020432..2cab626417b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java @@ -188,9 +188,6 @@ public class YahooBenchmark { @SuppressWarnings("WeakerAccess") public JsonPOJOSerializer() {} - @Override - public void configure(final Map props, final boolean isKey) {} - @Override public byte[] serialize(final String topic, final T data) { if (data == null) { @@ -203,10 +200,6 @@ public class YahooBenchmark { throw new SerializationException("Error serializing JSON message", e); } } - - @Override - public void close() {} - } // Note: these are also in the streams example package, eventuall use 1 file @@ -242,11 +235,6 @@ public class YahooBenchmark { return data; } - - @Override - public void close() { - - } } private KafkaStreams createYahooBenchmarkStreams(final Properties streamConfig, final String campaignsTopic, final String eventsTopic, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java index 452dd7b71d8..08112a691d0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.test.MockSourceNode; import org.junit.Test; import java.nio.charset.StandardCharsets; -import java.util.Map; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -51,15 +50,9 @@ public class SourceNodeTest { return topic + headers + new String(data, StandardCharsets.UTF_8); } - @Override - public void configure(final Map configs, final boolean isKey) { } - @Override public String deserialize(final String topic, final byte[] data) { return deserialize(topic, null, data); } - - @Override - public void close() { } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java index bf1d030b9e0..03e0c3a9a53 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java @@ -22,19 +22,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import java.util.Map; - class SerdeThatDoesntHandleNull implements Serde { - @Override - public void configure(final Map configs, final boolean isKey) { - - } - - @Override - public void close() { - - } - @Override public Serializer serializer() { return new StringSerializer(); diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 9de77980077..6f6c51eddea 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -473,10 +473,6 @@ public class TopologyTestDriverTest { } return Serdes.Integer().serializer().serialize(topic, (Integer) data); } - @Override - public void close() {} - @Override - public void configure(final Map configs, final boolean isKey) {} }, new Serializer() { @Override @@ -486,10 +482,6 @@ public class TopologyTestDriverTest { } return Serdes.Double().serializer().serialize(topic, (Double) data); } - @Override - public void close() {} - @Override - public void configure(final Map configs, final boolean isKey) {} }, processor); diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java index 5b7e2287e95..887bdc4d77f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java +++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java @@ -339,21 +339,11 @@ public class ClientCompatibilityTest { this.expectClusterId = expectClusterId; } - @Override - public void configure(Map configs, boolean isKey) { - // nothing to do - } - @Override public byte[] deserialize(String topic, byte[] data) { return data; } - @Override - public void close() { - // nothing to do - } - @Override public void onUpdate(ClusterResource clusterResource) { if (expectClusterId) {