diff --git a/streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java b/streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java new file mode 100644 index 00000000000..f3f3a941d20 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy.StrategyType; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link Topology#addSource(AutoOffsetReset, String, String...) adding a source processor} + * or when creating {@link KStream} or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { + protected final StrategyType offsetResetStrategy; + protected final Optional duration; + + private AutoOffsetReset(final StrategyType offsetResetStrategy, final Optional duration) { + this.offsetResetStrategy = offsetResetStrategy; + this.duration = duration; + } + + protected AutoOffsetReset(final AutoOffsetReset autoOffsetReset) { + this(autoOffsetReset.offsetResetStrategy, autoOffsetReset.duration); + } + + /** + * Creates an {@code AutoOffsetReset} instance representing "none". + * + * @return An {@link AutoOffsetReset} instance for no reset. + */ + public static AutoOffsetReset none() { + return new AutoOffsetReset(StrategyType.NONE, Optional.empty()); + } + + /** + * Creates an {@code AutoOffsetReset} instance representing "earliest". + * + * @return An {@link AutoOffsetReset} instance for the "earliest" offset. + */ + public static AutoOffsetReset earliest() { + return new AutoOffsetReset(StrategyType.EARLIEST, Optional.empty()); + } + + /** + * Creates an {@code AutoOffsetReset} instance representing "latest". + * + * @return An {@code AutoOffsetReset} instance for the "latest" offset. + */ + public static AutoOffsetReset latest() { + return new AutoOffsetReset(StrategyType.LATEST, Optional.empty()); + } + + /** + * Creates an {@code AutoOffsetReset} instance for the specified reset duration. + * + * @param duration The duration to use for the offset reset; must be non-negative. + * @return An {@code AutoOffsetReset} instance with the specified duration. + * @throws IllegalArgumentException If the duration is negative. + */ + public static AutoOffsetReset byDuration(final Duration duration) { + if (duration.isNegative()) { + throw new IllegalArgumentException("Duration cannot be negative"); + } + return new AutoOffsetReset(StrategyType.BY_DURATION, Optional.of(duration)); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AutoOffsetReset that = (AutoOffsetReset) o; + return offsetResetStrategy == that.offsetResetStrategy && duration.equals(that.duration); + } + + @Override + public int hashCode() { + int result = offsetResetStrategy.hashCode(); + result = 31 * result + duration.hashCode(); + return result; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index b9cc75b9fde..7badb3016f8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -144,7 +144,7 @@ public class StreamsBuilder { * @return a {@link KStream} for the specified topics */ public synchronized KStream stream(final Collection topics) { - return stream(topics, Consumed.with(null, null, null, null)); + return stream(topics, Consumed.with(null, null)); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java index 35fe13faa38..320e0babf77 100644 --- a/streams/src/main/java/org/apache/kafka/streams/Topology.java +++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java @@ -73,7 +73,10 @@ public class Topology { * Sets the {@code auto.offset.reset} configuration when * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} * or {@link KTable} via {@link StreamsBuilder}. + * + * @deprecated Since 4.0. Use {@link org.apache.kafka.streams.AutoOffsetReset} instead. */ + @Deprecated public enum AutoOffsetReset { EARLIEST, LATEST } @@ -130,7 +133,9 @@ public class Topology { * @param topics the name of one or more Kafka topics that this source is to consume * @return itself * @throws TopologyException if processor is already added or if topics have already been registered by another source + * @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, String...)} instead. */ + @Deprecated public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final String... topics) { @@ -138,6 +143,24 @@ public class Topology { return this; } + /** + * Adds a new source that consumes the specified topics and forwards the records to child processor and/or sink nodes. + * The source will use the specified {@link org.apache.kafka.streams.AutoOffsetReset offset reset policy} if no committed offsets are found. + * + * @param offsetReset the auto offset reset policy to use for this source if no committed offsets are found + * @param name the unique name of the source used to reference this node when {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} + * @param topics the name of one or more Kafka topics that this source is to consume + * @return itself + * @throws TopologyException if a processor is already added or if topics have already been registered by another source + */ + public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, + final String name, + final String... topics) { + // TODO mjsax + //internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topics); + return this; + } + /** * Add a new source that consumes from topics matching the given pattern * and forward the records to child processor and/or sink nodes. @@ -152,7 +175,9 @@ public class Topology { * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume * @return itself * @throws TopologyException if processor is already added or if topics have already been registered by another source + * @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, Pattern)} instead. */ + @Deprecated public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final Pattern topicPattern) { @@ -160,6 +185,29 @@ public class Topology { return this; } + /** + * Add a new source that consumes from topics matching the given pattern + * and forward the records to child processor and/or sink nodes. + * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and + * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the + * {@link StreamsConfig stream configuration}. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. + * + * @param offsetReset the auto offset reset policy value for this source if no committed offsets found + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. + * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume + * @return itself + * @throws TopologyException if processor is already added or if topics have already been registered by another source + */ + public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, + final String name, + final Pattern topicPattern) { + // TODO: mjsax + //internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topicPattern); + return this; + } + /** * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and @@ -218,7 +266,9 @@ public class Topology { * @param topics the name of one or more Kafka topics that this source is to consume * @return itself * @throws TopologyException if processor is already added or if topics have already been registered by another source + * @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor, String, String...)} instead. */ + @Deprecated public synchronized Topology addSource(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, @@ -227,6 +277,27 @@ public class Topology { return this; } + /** + * Adds a new source that consumes the specified topics with a specified {@link TimestampExtractor} + * and forwards the records to child processor and/or sink nodes. + * The source will use the provided timestamp extractor to determine the timestamp of each record. + * + * @param offsetReset the auto offset reset policy to use if no committed offsets are found + * @param timestampExtractor the timestamp extractor to use for this source + * @param name the unique name of the source used to reference this node when {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} + * @param topics the name of one or more Kafka topics that this source is to consume + * @return itself + * @throws TopologyException if a processor is already added or if topics have already been registered by another source + */ + public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, + final TimestampExtractor timestampExtractor, + final String name, + final String... topics) { + // TODO mjsax + //internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topics); + return this; + } + /** * Add a new source that consumes from topics matching the given pattern and forward the records to child processor * and/or sink nodes. @@ -243,7 +314,9 @@ public class Topology { * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume * @return itself * @throws TopologyException if processor is already added or if topics have already been registered by another source + * @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor, String, Pattern)} instead. */ + @Deprecated public synchronized Topology addSource(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, @@ -252,6 +325,27 @@ public class Topology { return this; } + /** + * Adds a new source that consumes from topics matching the given pattern with a specified {@link TimestampExtractor} + * and forwards the records to child processor and/or sink nodes. + * The source will use the provided timestamp extractor to determine the timestamp of each record. + * + * @param offsetReset the auto offset reset policy to use if no committed offsets are found + * @param timestampExtractor the timestamp extractor to use for this source + * @param name the unique name of the source used to reference this node when {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} + * @param topicPattern the regular expression pattern to match Kafka topics that this source is to consume + * @return itself + * @throws TopologyException if a processor is already added or if topics have already been registered by another source + */ + public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, + final TimestampExtractor timestampExtractor, + final String name, + final Pattern topicPattern) { + // TODO + //internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topicPattern); + return this; + } + /** * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. * The source will use the specified key and value deserializers. @@ -319,8 +413,9 @@ public class Topology { * @param topics the name of one or more Kafka topics that this source is to consume * @return itself * @throws TopologyException if processor is already added or if topics have already been registered by name + * @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, Deserializer, Deserializer, String...)} instead. */ - @SuppressWarnings("overloads") + @Deprecated public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer keyDeserializer, @@ -330,6 +425,34 @@ public class Topology { return this; } + /** + * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor + * and/or sink nodes. + * The source will use the specified key and value deserializers. + * The provided de-/serializers will be used for all the specified topics, so care should be taken when specifying + * topics that share the same key-value data format. + * + * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} + * @param keyDeserializer key deserializer used to read this source, if not specified the default + * key deserializer defined in the configs will be used + * @param valueDeserializer value deserializer used to read this source, + * if not specified the default value deserializer defined in the configs will be used + * @param topics the name of one or more Kafka topics that this source is to consume + * @return itself + * @throws TopologyException if processor is already added or if topics have already been registered by name + */ + public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, + final String name, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final String... topics) { + // TODO mjsax + //internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valueDeserializer, topics); + return this; + } + /** * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor * and/or sink nodes. @@ -348,7 +471,9 @@ public class Topology { * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume * @return itself * @throws TopologyException if processor is already added or if topics have already been registered by name + * @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, Deserializer, Deserializer, Pattern)} instead. */ + @Deprecated public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer keyDeserializer, @@ -358,6 +483,34 @@ public class Topology { return this; } + /** + * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor + * and/or sink nodes. + * The source will use the specified key and value deserializers. + * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for + * topics that share the same key-value data format. + * + * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} + * @param keyDeserializer key deserializer used to read this source, if not specified the default + * key deserializer defined in the configs will be used + * @param valueDeserializer value deserializer used to read this source, + * if not specified the default value deserializer defined in the configs will be used + * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume + * @return itself + * @throws TopologyException if processor is already added or if topics have already been registered by name + */ + public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, + final String name, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final Pattern topicPattern) { + // TODO mjsax + //internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valueDeserializer, topicPattern); + return this; + } + /** * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. * The source will use the specified key and value deserializers. @@ -375,8 +528,9 @@ public class Topology { * @param topics the name of one or more Kafka topics that this source is to consume * @return itself * @throws TopologyException if processor is already added or if topics have already been registered by another source + * @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, TimestampExtractor, Deserializer, Deserializer, String...)} instead. */ - @SuppressWarnings("overloads") + @Deprecated public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, @@ -387,6 +541,34 @@ public class Topology { return this; } + /** + * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. + * The source will use the specified key and value deserializers. + * + * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. + * @param timestampExtractor the stateless timestamp extractor used for this source, + * if not specified the default extractor defined in the configs will be used + * @param keyDeserializer key deserializer used to read this source, if not specified the default + * key deserializer defined in the configs will be used + * @param valueDeserializer value deserializer used to read this source, + * if not specified the default value deserializer defined in the configs will be used + * @param topics the name of one or more Kafka topics that this source is to consume + * @return itself + * @throws TopologyException if processor is already added or if topics have already been registered by another source + */ + public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, + final String name, + final TimestampExtractor timestampExtractor, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final String... topics) { + // TODO mjsax + //internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topics); + return this; + } + /** * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor * and/or sink nodes. @@ -407,8 +589,9 @@ public class Topology { * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume * @return itself * @throws TopologyException if processor is already added or if topics have already been registered by name + * @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, TimestampExtractor, Deserializer, Deserializer, Pattern)} instead. */ - @SuppressWarnings("overloads") + @Deprecated public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, @@ -419,6 +602,37 @@ public class Topology { return this; } + /** + * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor + * and/or sink nodes. + * The source will use the specified key and value deserializers. + * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for + * topics that share the same key-value data format. + * + * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. + * @param timestampExtractor the stateless timestamp extractor used for this source, + * if not specified the default extractor defined in the configs will be used + * @param keyDeserializer key deserializer used to read this source, if not specified the default + * key deserializer defined in the configs will be used + * @param valueDeserializer value deserializer used to read this source, + * if not specified the default value deserializer defined in the configs will be used + * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume + * @return itself + * @throws TopologyException if processor is already added or if topics have already been registered by name + */ + public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, + final String name, + final TimestampExtractor timestampExtractor, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final Pattern topicPattern) { + // TODO mjsax + //internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topicPattern); + return this; + } + /** * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic. * The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java b/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java new file mode 100644 index 00000000000..51054ee2cae --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.internals; + +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy.StrategyType; +import org.apache.kafka.streams.AutoOffsetReset; + +import java.time.Duration; +import java.util.Optional; + +public class AutoOffsetResetInternal extends AutoOffsetReset { + + public AutoOffsetResetInternal(final AutoOffsetReset autoOffsetReset) { + super(autoOffsetReset); + } + + public StrategyType offsetResetStrategy() { + return offsetResetStrategy; + } + public Optional duration() { + return duration; + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java index d1713ab20a1..046ff336fcc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.AutoOffsetReset; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.processor.TimestampExtractor; @@ -55,30 +56,79 @@ public class Consumed implements NamedOperation> { protected Serde keySerde; protected Serde valueSerde; protected TimestampExtractor timestampExtractor; - protected Topology.AutoOffsetReset resetPolicy; + @Deprecated + protected Topology.AutoOffsetReset legacyResetPolicy; + protected AutoOffsetReset resetPolicy; protected String processorName; + @SuppressWarnings("deprecation") private Consumed(final Serde keySerde, final Serde valueSerde, final TimestampExtractor timestampExtractor, - final Topology.AutoOffsetReset resetPolicy, + final Topology.AutoOffsetReset legacyResetPolicy, + final AutoOffsetReset resetPolicy, final String processorName) { this.keySerde = keySerde; this.valueSerde = valueSerde; this.timestampExtractor = timestampExtractor; + this.legacyResetPolicy = legacyResetPolicy; this.resetPolicy = resetPolicy; this.processorName = processorName; } + /** + * Create an instance of {@link Consumed} from an existing instance. + * @param consumed the instance of {@link Consumed} to copy + */ protected Consumed(final Consumed consumed) { - this(consumed.keySerde, - consumed.valueSerde, - consumed.timestampExtractor, - consumed.resetPolicy, - consumed.processorName + this( + consumed.keySerde, + consumed.valueSerde, + consumed.timestampExtractor, + consumed.legacyResetPolicy, + consumed.resetPolicy, + consumed.processorName ); } + @Deprecated + private static AutoOffsetReset convertOldToNew(final Topology.AutoOffsetReset resetPolicy) { + if (resetPolicy == null) { + return null; + } + + return resetPolicy == org.apache.kafka.streams.Topology.AutoOffsetReset.EARLIEST + ? AutoOffsetReset.earliest() + : AutoOffsetReset.latest(); + } + + /** + * Create an instance of {@link Consumed} with the supplied arguments. {@code null} values are acceptable. + * + * @param keySerde + * the key serde. If {@code null} the default key serde from config will be used + * @param valueSerde + * the value serde. If {@code null} the default value serde from config will be used + * @param timestampExtractor + * the timestamp extractor to used. If {@code null} the default timestamp extractor from config will be used + * @param resetPolicy + * the offset reset policy to be used. If {@code null} the default reset policy from config will be used + * + * @param key type + * @param value type + * + * @return a new instance of {@link Consumed} + * + * @deprecated Since 4.0. Use {@link #with(Serde, Serde, TimestampExtractor, AutoOffsetReset)} instead. + */ + @Deprecated + public static Consumed with(final Serde keySerde, + final Serde valueSerde, + final TimestampExtractor timestampExtractor, + final Topology.AutoOffsetReset resetPolicy) { + return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy, convertOldToNew(resetPolicy), null); + } + /** * Create an instance of {@link Consumed} with the supplied arguments. {@code null} values are acceptable. * @@ -99,8 +149,8 @@ public class Consumed implements NamedOperation> { public static Consumed with(final Serde keySerde, final Serde valueSerde, final TimestampExtractor timestampExtractor, - final Topology.AutoOffsetReset resetPolicy) { - return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy, null); + final AutoOffsetReset resetPolicy) { + return new Consumed<>(keySerde, valueSerde, timestampExtractor, null, resetPolicy, null); } /** @@ -118,7 +168,7 @@ public class Consumed implements NamedOperation> { */ public static Consumed with(final Serde keySerde, final Serde valueSerde) { - return new Consumed<>(keySerde, valueSerde, null, null, null); + return new Consumed<>(keySerde, valueSerde, null, null, null, null); } /** @@ -133,7 +183,25 @@ public class Consumed implements NamedOperation> { * @return a new instance of {@link Consumed} */ public static Consumed with(final TimestampExtractor timestampExtractor) { - return new Consumed<>(null, null, timestampExtractor, null, null); + return new Consumed<>(null, null, timestampExtractor, null, null, null); + } + + /** + * Create an instance of {@link Consumed} with a {@link org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}. + * + * @param resetPolicy + * the offset reset policy to be used. If {@code null} the default reset policy from config will be used + * + * @param key type + * @param value type + * + * @return a new instance of {@link Consumed} + * + * @deprecated Since 4.0. Use {@link #with(AutoOffsetReset)} instead. + */ + @Deprecated + public static Consumed with(final Topology.AutoOffsetReset resetPolicy) { + return new Consumed<>(null, null, null, resetPolicy, convertOldToNew(resetPolicy), null); } /** @@ -147,8 +215,8 @@ public class Consumed implements NamedOperation> { * * @return a new instance of {@link Consumed} */ - public static Consumed with(final Topology.AutoOffsetReset resetPolicy) { - return new Consumed<>(null, null, null, resetPolicy, null); + public static Consumed with(final AutoOffsetReset resetPolicy) { + return new Consumed<>(null, null, null, null, resetPolicy, null); } /** @@ -163,7 +231,7 @@ public class Consumed implements NamedOperation> { * @return a new instance of {@link Consumed} */ public static Consumed as(final String processorName) { - return new Consumed<>(null, null, null, null, processorName); + return new Consumed<>(null, null, null, null, null, processorName); } /** @@ -175,7 +243,7 @@ public class Consumed implements NamedOperation> { * @return a new instance of {@link Consumed} */ public Consumed withKeySerde(final Serde keySerde) { - return new Consumed(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName); + return new Consumed<>(keySerde, valueSerde, timestampExtractor, legacyResetPolicy, resetPolicy, processorName); } /** @@ -187,7 +255,7 @@ public class Consumed implements NamedOperation> { * @return a new instance of {@link Consumed} */ public Consumed withValueSerde(final Serde valueSerde) { - return new Consumed(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName); + return new Consumed<>(keySerde, valueSerde, timestampExtractor, legacyResetPolicy, resetPolicy, processorName); } /** @@ -199,7 +267,29 @@ public class Consumed implements NamedOperation> { * @return a new instance of {@link Consumed} */ public Consumed withTimestampExtractor(final TimestampExtractor timestampExtractor) { - return new Consumed(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName); + return new Consumed<>(keySerde, valueSerde, timestampExtractor, legacyResetPolicy, resetPolicy, processorName); + } + + /** + * Configure the instance of {@link Consumed} with a {@link org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}. + * + * @param resetPolicy + * the offset reset policy to be used. If {@code null} the default reset policy from config will be used + * + * @return a new instance of {@link Consumed} + * + * @deprecated Since 4.0. Use {@link #withOffsetResetPolicy(AutoOffsetReset)} instead. + */ + @Deprecated + public Consumed withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy) { + return new Consumed<>( + keySerde, + valueSerde, + timestampExtractor, + resetPolicy, + convertOldToNew(resetPolicy), + processorName + ); } /** @@ -210,8 +300,8 @@ public class Consumed implements NamedOperation> { * * @return a new instance of {@link Consumed} */ - public Consumed withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy) { - return new Consumed(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName); + public Consumed withOffsetResetPolicy(final AutoOffsetReset resetPolicy) { + return new Consumed<>(keySerde, valueSerde, timestampExtractor, null, resetPolicy, processorName); } /** @@ -224,7 +314,7 @@ public class Consumed implements NamedOperation> { */ @Override public Consumed withName(final String processorName) { - return new Consumed(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName); + return new Consumed<>(keySerde, valueSerde, timestampExtractor, legacyResetPolicy, resetPolicy, processorName); } @Override @@ -239,11 +329,12 @@ public class Consumed implements NamedOperation> { return Objects.equals(keySerde, consumed.keySerde) && Objects.equals(valueSerde, consumed.valueSerde) && Objects.equals(timestampExtractor, consumed.timestampExtractor) && + legacyResetPolicy == consumed.legacyResetPolicy && resetPolicy == consumed.resetPolicy; } @Override public int hashCode() { - return Objects.hash(keySerde, valueSerde, timestampExtractor, resetPolicy); + return Objects.hash(keySerde, valueSerde, timestampExtractor, legacyResetPolicy, resetPolicy); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java index 40bd53a0b8c..3f5f63c7b77 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.internals.AutoOffsetResetInternal; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.processor.TimestampExtractor; @@ -28,11 +29,10 @@ public class ConsumedInternal extends Consumed { super(consumed); } - public ConsumedInternal(final Serde keySerde, final Serde valueSerde, final TimestampExtractor timestampExtractor, - final Topology.AutoOffsetReset offsetReset) { + final AutoOffsetResetInternal offsetReset) { this(Consumed.with(keySerde, valueSerde, timestampExtractor, offsetReset)); } @@ -60,8 +60,14 @@ public class ConsumedInternal extends Consumed { return timestampExtractor; } - public Topology.AutoOffsetReset offsetResetPolicy() { - return resetPolicy; + public AutoOffsetResetInternal offsetResetPolicy() { + return resetPolicy == null ? null : new AutoOffsetResetInternal(resetPolicy); + } + + @SuppressWarnings("deprecation") + // TODO mjsax remove + public Topology.AutoOffsetReset legacyOffsetResetPolicy() { + return legacyResetPolicy; } public String name() { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java index 97b686eaff6..b13477c546e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.kstream.internals.graph; -import org.apache.kafka.streams.Topology.AutoOffsetReset; +import org.apache.kafka.streams.AutoOffsetReset; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; @@ -73,14 +73,16 @@ public class StreamSourceNode extends SourceGraphNode { public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { if (topicPattern().isPresent()) { - topologyBuilder.addSource(consumedInternal().offsetResetPolicy(), + // TODO mjsax + topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(), nodeName(), consumedInternal().timestampExtractor(), consumedInternal().keyDeserializer(), consumedInternal().valueDeserializer(), topicPattern().get()); } else { - topologyBuilder.addSource(consumedInternal().offsetResetPolicy(), + // TODO mjsax + topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(), nodeName(), consumedInternal().timestampExtractor(), consumedInternal().keyDeserializer(), diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java index 5e776a5c733..81b44703a2d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java @@ -96,7 +96,8 @@ public class TableSourceNode extends SourceGraphNode { false ); } else { - topologyBuilder.addSource(consumedInternal().offsetResetPolicy(), + // TODO mjsax + topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(), sourceName, consumedInternal().timestampExtractor(), consumedInternal().keyDeserializer(), diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index eeb076fc0cf..23bdcfde6f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -438,6 +438,7 @@ public class InternalTopologyBuilder { return this; } + @SuppressWarnings("deprecation") public final void addSource(final Topology.AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, @@ -465,6 +466,7 @@ public class InternalTopologyBuilder { nodeGroups = null; } + @SuppressWarnings("deprecation") public final void addSource(final Topology.AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, @@ -915,6 +917,7 @@ public class InternalTopologyBuilder { } + @SuppressWarnings("deprecation") private void maybeAddToResetList(final Collection earliestResets, final Collection latestResets, final Topology.AutoOffsetReset offsetReset, diff --git a/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java new file mode 100644 index 00000000000..2dad17cd81f --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/AutoOffsetResetTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import org.apache.kafka.streams.internals.AutoOffsetResetInternal; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class AutoOffsetResetTest { + + @Test + void latestShouldReturnAnEmptyDuration() { + final AutoOffsetResetInternal latest = new AutoOffsetResetInternal(AutoOffsetReset.latest()); + assertTrue(latest.duration().isEmpty(), "Latest should have an empty duration."); + } + + @Test + void earliestShouldReturnAnEmptyDuration() { + final AutoOffsetResetInternal earliest = new AutoOffsetResetInternal(AutoOffsetReset.earliest()); + assertTrue(earliest.duration().isEmpty(), "Earliest should have an empty duration."); + } + + @Test + void customDurationShouldMatchExpectedValue() { + final Duration duration = Duration.ofSeconds(10L); + final AutoOffsetResetInternal custom = new AutoOffsetResetInternal(AutoOffsetReset.byDuration(duration)); + assertEquals(10L, custom.duration().get().toSeconds(), "Duration should match the specified value in milliseconds."); + } + + @Test + void shouldThrowExceptionIfDurationIsNegative() { + final IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + () -> AutoOffsetReset.byDuration(Duration.ofSeconds(-1)), + "Creating an AutoOffsetReset with a negative duration should throw an IllegalArgumentException." + ); + assertEquals("Duration cannot be negative", exception.getMessage(), "Exception message should indicate the duration cannot be negative."); + } + + @Test + void twoInstancesCreatedAtTheSameTimeWithSameOptionsShouldBeEqual() { + final AutoOffsetReset latest1 = AutoOffsetReset.latest(); + final AutoOffsetReset latest2 = AutoOffsetReset.latest(); + final AutoOffsetReset earliest1 = AutoOffsetReset.earliest(); + final AutoOffsetReset earliest2 = AutoOffsetReset.earliest(); + final AutoOffsetReset custom1 = AutoOffsetReset.byDuration(Duration.ofSeconds(5)); + final AutoOffsetReset custom2 = AutoOffsetReset.byDuration(Duration.ofSeconds(5)); + final AutoOffsetReset customDifferent = AutoOffsetReset.byDuration(Duration.ofSeconds(10)); + + // Equals + assertEquals(latest1, latest2, "Two latest instances should be equal."); + assertEquals(earliest1, earliest2, "Two earliest instances should be equal."); + assertEquals(custom1, custom2, "Two custom instances with the same duration should be equal."); + assertNotEquals(latest1, earliest1, "Latest and earliest should not be equal."); + assertNotEquals(custom1, customDifferent, "Custom instances with different durations should not be equal."); + + // HashCode + assertEquals(latest1.hashCode(), latest2.hashCode(), "HashCode for equal instances should be the same."); + assertEquals(custom1.hashCode(), custom2.hashCode(), "HashCode for equal custom instances should be the same."); + assertNotEquals(custom1.hashCode(), customDifferent.hashCode(), "HashCode for different custom instances should not match."); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index f43747d3cf2..0dc0179c6e5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -2257,7 +2257,7 @@ public class TopologyTest { private TopologyDescription.Source addSource(final String sourceName, final String... sourceTopic) { - topology.addSource(null, sourceName, null, null, null, sourceTopic); + topology.addSource((Topology.AutoOffsetReset) null, sourceName, null, null, null, sourceTopic); final StringBuilder allSourceTopics = new StringBuilder(sourceTopic[0]); for (int i = 1; i < sourceTopic.length; ++i) { allSourceTopics.append(", ").append(sourceTopic[i]); @@ -2267,7 +2267,7 @@ public class TopologyTest { private TopologyDescription.Source addSource(final String sourceName, final Pattern sourcePattern) { - topology.addSource(null, sourceName, null, null, null, sourcePattern); + topology.addSource((Topology.AutoOffsetReset) null, sourceName, null, null, null, sourcePattern); return new InternalTopologyBuilder.Source(sourceName, null, sourcePattern); } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala index 9a8034bac5a..89f461a8fea 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala @@ -18,7 +18,7 @@ package org.apache.kafka.streams.scala.kstream import org.apache.kafka.common.serialization.Serde import org.apache.kafka.streams.kstream.{Consumed => ConsumedJ} -import org.apache.kafka.streams.Topology +import org.apache.kafka.streams.{AutoOffsetReset, Topology} import org.apache.kafka.streams.processor.TimestampExtractor object Consumed { @@ -36,12 +36,32 @@ object Consumed { * @param valueSerde the value serde to use. * @return a new instance of [[Consumed]] */ + @deprecated("Use `with` method that accepts `AutoOffsetReset` instead", "4.0.0") def `with`[K, V]( timestampExtractor: TimestampExtractor, resetPolicy: Topology.AutoOffsetReset )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] = ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy) + /** + * Create an instance of [[Consumed]] with the supplied arguments. `null` values are acceptable. + * + * @tparam K key type + * @tparam V value type + * @param timestampExtractor the timestamp extractor to used. If `null` the default timestamp extractor from + * config will be used + * @param resetPolicy the offset reset policy to be used. If `null` the default reset policy from config + * will be used + * @param keySerde the key serde to use. + * @param valueSerde the value serde to use. + * @return a new instance of [[Consumed]] + */ + def `with`[K, V]( + timestampExtractor: TimestampExtractor, + resetPolicy: AutoOffsetReset + )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] = + ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy) + /** * Create an instance of [[Consumed]] with key and value Serdes. * @@ -74,8 +94,22 @@ object Consumed { * @param resetPolicy the offset reset policy to be used. If `null` the default reset policy from config will be used * @return a new instance of [[Consumed]] */ + @deprecated("Use `with` method that accepts `AutoOffsetReset` instead", "4.0.0") def `with`[K, V]( resetPolicy: Topology.AutoOffsetReset )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] = ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde) + + /** + * Create an instance of [[Consumed]] with a `org.apache.kafka.streams.AutoOffsetReset`. + * + * @tparam K key type + * @tparam V value type + * @param resetPolicy the offset reset policy to be used. If `null` the default reset policy from config will be used + * @return a new instance of [[Consumed]] + */ + def `with`[K, V]( + resetPolicy: AutoOffsetReset + )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] = + ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde) } diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala index 0b44165164b..4656a4d12fc 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala @@ -16,7 +16,8 @@ */ package org.apache.kafka.streams.scala.kstream -import org.apache.kafka.streams.Topology +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy +import org.apache.kafka.streams.AutoOffsetReset import org.apache.kafka.streams.kstream.internals.ConsumedInternal import org.apache.kafka.streams.processor.FailOnInvalidTimestamp import org.apache.kafka.streams.scala.serialization.Serdes @@ -38,15 +39,15 @@ class ConsumedTest { @Test def testCreateConsumedWithTimestampExtractorAndResetPolicy(): Unit = { val timestampExtractor = new FailOnInvalidTimestamp() - val resetPolicy = Topology.AutoOffsetReset.LATEST + val resetPolicy = AutoOffsetReset.latest() val consumed: Consumed[String, Long] = - Consumed.`with`[String, Long](timestampExtractor, resetPolicy) + Consumed.`with`(timestampExtractor, resetPolicy) val internalConsumed = new ConsumedInternal(consumed) assertEquals(Serdes.stringSerde.getClass, internalConsumed.keySerde.getClass) assertEquals(Serdes.longSerde.getClass, internalConsumed.valueSerde.getClass) assertEquals(timestampExtractor, internalConsumed.timestampExtractor) - assertEquals(resetPolicy, internalConsumed.offsetResetPolicy) + assertEquals(AutoOffsetResetStrategy.StrategyType.LATEST, internalConsumed.offsetResetPolicy.offsetResetStrategy()) } @Test @@ -59,14 +60,15 @@ class ConsumedTest { assertEquals(Serdes.longSerde.getClass, internalConsumed.valueSerde.getClass) assertEquals(timestampExtractor, internalConsumed.timestampExtractor) } + @Test def testCreateConsumedWithResetPolicy(): Unit = { - val resetPolicy = Topology.AutoOffsetReset.LATEST + val resetPolicy = AutoOffsetReset.latest() val consumed: Consumed[String, Long] = Consumed.`with`[String, Long](resetPolicy) val internalConsumed = new ConsumedInternal(consumed) assertEquals(Serdes.stringSerde.getClass, internalConsumed.keySerde.getClass) assertEquals(Serdes.longSerde.getClass, internalConsumed.valueSerde.getClass) - assertEquals(resetPolicy, internalConsumed.offsetResetPolicy) + assertEquals(AutoOffsetResetStrategy.StrategyType.LATEST, internalConsumed.offsetResetPolicy.offsetResetStrategy()) } }