KAFKA-18015: Add support for duration based offset reset strategy to Kafka Streams (#17973)

Part of KIP-1106.

Adds the public APIs to Kafka Streams, to support the the newly added "by_duration" reset policy,
plus adds the missing "none" reset policy. Deprecates the enum `Topology.AutoOffsetReset` and
all related methods, and replaced them with new overload using the new `AutoOffsetReset` class.

Co-authored-by: Matthias J. Sax <matthias@confluent.io>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
KApolinario1120 2024-12-11 12:47:25 -06:00 committed by GitHub
parent 156d551603
commit d83f09d014
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 618 additions and 42 deletions

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy.StrategyType;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import java.time.Duration;
import java.util.Optional;
/**
* Sets the {@code auto.offset.reset} configuration when
* {@link Topology#addSource(AutoOffsetReset, String, String...) adding a source processor}
* or when creating {@link KStream} or {@link KTable} via {@link StreamsBuilder}.
*/
public class AutoOffsetReset {
protected final StrategyType offsetResetStrategy;
protected final Optional<Duration> duration;
private AutoOffsetReset(final StrategyType offsetResetStrategy, final Optional<Duration> duration) {
this.offsetResetStrategy = offsetResetStrategy;
this.duration = duration;
}
protected AutoOffsetReset(final AutoOffsetReset autoOffsetReset) {
this(autoOffsetReset.offsetResetStrategy, autoOffsetReset.duration);
}
/**
* Creates an {@code AutoOffsetReset} instance representing "none".
*
* @return An {@link AutoOffsetReset} instance for no reset.
*/
public static AutoOffsetReset none() {
return new AutoOffsetReset(StrategyType.NONE, Optional.empty());
}
/**
* Creates an {@code AutoOffsetReset} instance representing "earliest".
*
* @return An {@link AutoOffsetReset} instance for the "earliest" offset.
*/
public static AutoOffsetReset earliest() {
return new AutoOffsetReset(StrategyType.EARLIEST, Optional.empty());
}
/**
* Creates an {@code AutoOffsetReset} instance representing "latest".
*
* @return An {@code AutoOffsetReset} instance for the "latest" offset.
*/
public static AutoOffsetReset latest() {
return new AutoOffsetReset(StrategyType.LATEST, Optional.empty());
}
/**
* Creates an {@code AutoOffsetReset} instance for the specified reset duration.
*
* @param duration The duration to use for the offset reset; must be non-negative.
* @return An {@code AutoOffsetReset} instance with the specified duration.
* @throws IllegalArgumentException If the duration is negative.
*/
public static AutoOffsetReset byDuration(final Duration duration) {
if (duration.isNegative()) {
throw new IllegalArgumentException("Duration cannot be negative");
}
return new AutoOffsetReset(StrategyType.BY_DURATION, Optional.of(duration));
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final AutoOffsetReset that = (AutoOffsetReset) o;
return offsetResetStrategy == that.offsetResetStrategy && duration.equals(that.duration);
}
@Override
public int hashCode() {
int result = offsetResetStrategy.hashCode();
result = 31 * result + duration.hashCode();
return result;
}
}

View File

@ -144,7 +144,7 @@ public class StreamsBuilder {
* @return a {@link KStream} for the specified topics
*/
public synchronized <K, V> KStream<K, V> stream(final Collection<String> topics) {
return stream(topics, Consumed.with(null, null, null, null));
return stream(topics, Consumed.with(null, null));
}
/**

View File

@ -73,7 +73,10 @@ public class Topology {
* Sets the {@code auto.offset.reset} configuration when
* {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream}
* or {@link KTable} via {@link StreamsBuilder}.
*
* @deprecated Since 4.0. Use {@link org.apache.kafka.streams.AutoOffsetReset} instead.
*/
@Deprecated
public enum AutoOffsetReset {
EARLIEST, LATEST
}
@ -130,7 +133,9 @@ public class Topology {
* @param topics the name of one or more Kafka topics that this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics have already been registered by another source
* @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, String...)} instead.
*/
@Deprecated
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final String... topics) {
@ -138,6 +143,24 @@ public class Topology {
return this;
}
/**
* Adds a new source that consumes the specified topics and forwards the records to child processor and/or sink nodes.
* The source will use the specified {@link org.apache.kafka.streams.AutoOffsetReset offset reset policy} if no committed offsets are found.
*
* @param offsetReset the auto offset reset policy to use for this source if no committed offsets are found
* @param name the unique name of the source used to reference this node when {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
* @param topics the name of one or more Kafka topics that this source is to consume
* @return itself
* @throws TopologyException if a processor is already added or if topics have already been registered by another source
*/
public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset,
final String name,
final String... topics) {
// TODO mjsax
//internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topics);
return this;
}
/**
* Add a new source that consumes from topics matching the given pattern
* and forward the records to child processor and/or sink nodes.
@ -152,7 +175,9 @@ public class Topology {
* @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics have already been registered by another source
* @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, Pattern)} instead.
*/
@Deprecated
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final Pattern topicPattern) {
@ -160,6 +185,29 @@ public class Topology {
return this;
}
/**
* Add a new source that consumes from topics matching the given pattern
* and forward the records to child processor and/or sink nodes.
* The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
* {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
* {@link StreamsConfig stream configuration}.
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
*
* @param offsetReset the auto offset reset policy value for this source if no committed offsets found
* @param name the unique name of the source used to reference this node when
* {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
* @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics have already been registered by another source
*/
public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset,
final String name,
final Pattern topicPattern) {
// TODO: mjsax
//internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topicPattern);
return this;
}
/**
* Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
* The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
@ -218,7 +266,9 @@ public class Topology {
* @param topics the name of one or more Kafka topics that this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics have already been registered by another source
* @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor, String, String...)} instead.
*/
@Deprecated
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final TimestampExtractor timestampExtractor,
final String name,
@ -227,6 +277,27 @@ public class Topology {
return this;
}
/**
* Adds a new source that consumes the specified topics with a specified {@link TimestampExtractor}
* and forwards the records to child processor and/or sink nodes.
* The source will use the provided timestamp extractor to determine the timestamp of each record.
*
* @param offsetReset the auto offset reset policy to use if no committed offsets are found
* @param timestampExtractor the timestamp extractor to use for this source
* @param name the unique name of the source used to reference this node when {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
* @param topics the name of one or more Kafka topics that this source is to consume
* @return itself
* @throws TopologyException if a processor is already added or if topics have already been registered by another source
*/
public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset,
final TimestampExtractor timestampExtractor,
final String name,
final String... topics) {
// TODO mjsax
//internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topics);
return this;
}
/**
* Add a new source that consumes from topics matching the given pattern and forward the records to child processor
* and/or sink nodes.
@ -243,7 +314,9 @@ public class Topology {
* @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics have already been registered by another source
* @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor, String, Pattern)} instead.
*/
@Deprecated
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final TimestampExtractor timestampExtractor,
final String name,
@ -252,6 +325,27 @@ public class Topology {
return this;
}
/**
* Adds a new source that consumes from topics matching the given pattern with a specified {@link TimestampExtractor}
* and forwards the records to child processor and/or sink nodes.
* The source will use the provided timestamp extractor to determine the timestamp of each record.
*
* @param offsetReset the auto offset reset policy to use if no committed offsets are found
* @param timestampExtractor the timestamp extractor to use for this source
* @param name the unique name of the source used to reference this node when {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
* @param topicPattern the regular expression pattern to match Kafka topics that this source is to consume
* @return itself
* @throws TopologyException if a processor is already added or if topics have already been registered by another source
*/
public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset,
final TimestampExtractor timestampExtractor,
final String name,
final Pattern topicPattern) {
// TODO
//internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topicPattern);
return this;
}
/**
* Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
* The source will use the specified key and value deserializers.
@ -319,8 +413,9 @@ public class Topology {
* @param topics the name of one or more Kafka topics that this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics have already been registered by name
* @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, Deserializer, Deserializer, String...)} instead.
*/
@SuppressWarnings("overloads")
@Deprecated
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final Deserializer<?> keyDeserializer,
@ -330,6 +425,34 @@ public class Topology {
return this;
}
/**
* Add a new source that consumes from topics matching the given pattern and forwards the records to child processor
* and/or sink nodes.
* The source will use the specified key and value deserializers.
* The provided de-/serializers will be used for all the specified topics, so care should be taken when specifying
* topics that share the same key-value data format.
*
* @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found
* @param name the unique name of the source used to reference this node when
* {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
* @param keyDeserializer key deserializer used to read this source, if not specified the default
* key deserializer defined in the configs will be used
* @param valueDeserializer value deserializer used to read this source,
* if not specified the default value deserializer defined in the configs will be used
* @param topics the name of one or more Kafka topics that this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics have already been registered by name
*/
public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset,
final String name,
final Deserializer<?> keyDeserializer,
final Deserializer<?> valueDeserializer,
final String... topics) {
// TODO mjsax
//internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valueDeserializer, topics);
return this;
}
/**
* Add a new source that consumes from topics matching the given pattern and forwards the records to child processor
* and/or sink nodes.
@ -348,7 +471,9 @@ public class Topology {
* @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics have already been registered by name
* @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, Deserializer, Deserializer, Pattern)} instead.
*/
@Deprecated
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final Deserializer<?> keyDeserializer,
@ -358,6 +483,34 @@ public class Topology {
return this;
}
/**
* Add a new source that consumes from topics matching the given pattern and forwards the records to child processor
* and/or sink nodes.
* The source will use the specified key and value deserializers.
* The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
* topics that share the same key-value data format.
*
* @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found
* @param name the unique name of the source used to reference this node when
* {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
* @param keyDeserializer key deserializer used to read this source, if not specified the default
* key deserializer defined in the configs will be used
* @param valueDeserializer value deserializer used to read this source,
* if not specified the default value deserializer defined in the configs will be used
* @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics have already been registered by name
*/
public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset,
final String name,
final Deserializer<?> keyDeserializer,
final Deserializer<?> valueDeserializer,
final Pattern topicPattern) {
// TODO mjsax
//internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valueDeserializer, topicPattern);
return this;
}
/**
* Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
* The source will use the specified key and value deserializers.
@ -375,8 +528,9 @@ public class Topology {
* @param topics the name of one or more Kafka topics that this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics have already been registered by another source
* @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, TimestampExtractor, Deserializer, Deserializer, String...)} instead.
*/
@SuppressWarnings("overloads")
@Deprecated
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
@ -387,6 +541,34 @@ public class Topology {
return this;
}
/**
* Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
* The source will use the specified key and value deserializers.
*
* @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found
* @param name the unique name of the source used to reference this node when
* {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
* @param timestampExtractor the stateless timestamp extractor used for this source,
* if not specified the default extractor defined in the configs will be used
* @param keyDeserializer key deserializer used to read this source, if not specified the default
* key deserializer defined in the configs will be used
* @param valueDeserializer value deserializer used to read this source,
* if not specified the default value deserializer defined in the configs will be used
* @param topics the name of one or more Kafka topics that this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics have already been registered by another source
*/
public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
final Deserializer<?> keyDeserializer,
final Deserializer<?> valueDeserializer,
final String... topics) {
// TODO mjsax
//internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topics);
return this;
}
/**
* Add a new source that consumes from topics matching the given pattern and forwards the records to child processor
* and/or sink nodes.
@ -407,8 +589,9 @@ public class Topology {
* @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics have already been registered by name
* @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, TimestampExtractor, Deserializer, Deserializer, Pattern)} instead.
*/
@SuppressWarnings("overloads")
@Deprecated
public synchronized Topology addSource(final AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
@ -419,6 +602,37 @@ public class Topology {
return this;
}
/**
* Add a new source that consumes from topics matching the given pattern and forwards the records to child processor
* and/or sink nodes.
* The source will use the specified key and value deserializers.
* The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
* topics that share the same key-value data format.
*
* @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found
* @param name the unique name of the source used to reference this node when
* {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
* @param timestampExtractor the stateless timestamp extractor used for this source,
* if not specified the default extractor defined in the configs will be used
* @param keyDeserializer key deserializer used to read this source, if not specified the default
* key deserializer defined in the configs will be used
* @param valueDeserializer value deserializer used to read this source,
* if not specified the default value deserializer defined in the configs will be used
* @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
* @return itself
* @throws TopologyException if processor is already added or if topics have already been registered by name
*/
public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
final Deserializer<?> keyDeserializer,
final Deserializer<?> valueDeserializer,
final Pattern topicPattern) {
// TODO mjsax
//internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
return this;
}
/**
* Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
* The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy.StrategyType;
import org.apache.kafka.streams.AutoOffsetReset;
import java.time.Duration;
import java.util.Optional;
public class AutoOffsetResetInternal extends AutoOffsetReset {
public AutoOffsetResetInternal(final AutoOffsetReset autoOffsetReset) {
super(autoOffsetReset);
}
public StrategyType offsetResetStrategy() {
return offsetResetStrategy;
}
public Optional<Duration> duration() {
return duration;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.AutoOffsetReset;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.TimestampExtractor;
@ -55,30 +56,79 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
protected Serde<K> keySerde;
protected Serde<V> valueSerde;
protected TimestampExtractor timestampExtractor;
protected Topology.AutoOffsetReset resetPolicy;
@Deprecated
protected Topology.AutoOffsetReset legacyResetPolicy;
protected AutoOffsetReset resetPolicy;
protected String processorName;
@SuppressWarnings("deprecation")
private Consumed(final Serde<K> keySerde,
final Serde<V> valueSerde,
final TimestampExtractor timestampExtractor,
final Topology.AutoOffsetReset resetPolicy,
final Topology.AutoOffsetReset legacyResetPolicy,
final AutoOffsetReset resetPolicy,
final String processorName) {
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.timestampExtractor = timestampExtractor;
this.legacyResetPolicy = legacyResetPolicy;
this.resetPolicy = resetPolicy;
this.processorName = processorName;
}
/**
* Create an instance of {@link Consumed} from an existing instance.
* @param consumed the instance of {@link Consumed} to copy
*/
protected Consumed(final Consumed<K, V> consumed) {
this(consumed.keySerde,
this(
consumed.keySerde,
consumed.valueSerde,
consumed.timestampExtractor,
consumed.legacyResetPolicy,
consumed.resetPolicy,
consumed.processorName
);
}
@Deprecated
private static AutoOffsetReset convertOldToNew(final Topology.AutoOffsetReset resetPolicy) {
if (resetPolicy == null) {
return null;
}
return resetPolicy == org.apache.kafka.streams.Topology.AutoOffsetReset.EARLIEST
? AutoOffsetReset.earliest()
: AutoOffsetReset.latest();
}
/**
* Create an instance of {@link Consumed} with the supplied arguments. {@code null} values are acceptable.
*
* @param keySerde
* the key serde. If {@code null} the default key serde from config will be used
* @param valueSerde
* the value serde. If {@code null} the default value serde from config will be used
* @param timestampExtractor
* the timestamp extractor to used. If {@code null} the default timestamp extractor from config will be used
* @param resetPolicy
* the offset reset policy to be used. If {@code null} the default reset policy from config will be used
*
* @param <K> key type
* @param <V> value type
*
* @return a new instance of {@link Consumed}
*
* @deprecated Since 4.0. Use {@link #with(Serde, Serde, TimestampExtractor, AutoOffsetReset)} instead.
*/
@Deprecated
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde,
final TimestampExtractor timestampExtractor,
final Topology.AutoOffsetReset resetPolicy) {
return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy, convertOldToNew(resetPolicy), null);
}
/**
* Create an instance of {@link Consumed} with the supplied arguments. {@code null} values are acceptable.
*
@ -99,8 +149,8 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde,
final TimestampExtractor timestampExtractor,
final Topology.AutoOffsetReset resetPolicy) {
return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy, null);
final AutoOffsetReset resetPolicy) {
return new Consumed<>(keySerde, valueSerde, timestampExtractor, null, resetPolicy, null);
}
/**
@ -118,7 +168,7 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
*/
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde) {
return new Consumed<>(keySerde, valueSerde, null, null, null);
return new Consumed<>(keySerde, valueSerde, null, null, null, null);
}
/**
@ -133,7 +183,25 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final TimestampExtractor timestampExtractor) {
return new Consumed<>(null, null, timestampExtractor, null, null);
return new Consumed<>(null, null, timestampExtractor, null, null, null);
}
/**
* Create an instance of {@link Consumed} with a {@link org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
*
* @param resetPolicy
* the offset reset policy to be used. If {@code null} the default reset policy from config will be used
*
* @param <K> key type
* @param <V> value type
*
* @return a new instance of {@link Consumed}
*
* @deprecated Since 4.0. Use {@link #with(AutoOffsetReset)} instead.
*/
@Deprecated
public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset resetPolicy) {
return new Consumed<>(null, null, null, resetPolicy, convertOldToNew(resetPolicy), null);
}
/**
@ -147,8 +215,8 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
*
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset resetPolicy) {
return new Consumed<>(null, null, null, resetPolicy, null);
public static <K, V> Consumed<K, V> with(final AutoOffsetReset resetPolicy) {
return new Consumed<>(null, null, null, null, resetPolicy, null);
}
/**
@ -163,7 +231,7 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> as(final String processorName) {
return new Consumed<>(null, null, null, null, processorName);
return new Consumed<>(null, null, null, null, null, processorName);
}
/**
@ -175,7 +243,7 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
* @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withKeySerde(final Serde<K> keySerde) {
return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName);
return new Consumed<>(keySerde, valueSerde, timestampExtractor, legacyResetPolicy, resetPolicy, processorName);
}
/**
@ -187,7 +255,7 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
* @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withValueSerde(final Serde<V> valueSerde) {
return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName);
return new Consumed<>(keySerde, valueSerde, timestampExtractor, legacyResetPolicy, resetPolicy, processorName);
}
/**
@ -199,7 +267,29 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
* @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withTimestampExtractor(final TimestampExtractor timestampExtractor) {
return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName);
return new Consumed<>(keySerde, valueSerde, timestampExtractor, legacyResetPolicy, resetPolicy, processorName);
}
/**
* Configure the instance of {@link Consumed} with a {@link org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
*
* @param resetPolicy
* the offset reset policy to be used. If {@code null} the default reset policy from config will be used
*
* @return a new instance of {@link Consumed}
*
* @deprecated Since 4.0. Use {@link #withOffsetResetPolicy(AutoOffsetReset)} instead.
*/
@Deprecated
public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy) {
return new Consumed<>(
keySerde,
valueSerde,
timestampExtractor,
resetPolicy,
convertOldToNew(resetPolicy),
processorName
);
}
/**
@ -210,8 +300,8 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
*
* @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy) {
return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName);
public Consumed<K, V> withOffsetResetPolicy(final AutoOffsetReset resetPolicy) {
return new Consumed<>(keySerde, valueSerde, timestampExtractor, null, resetPolicy, processorName);
}
/**
@ -224,7 +314,7 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
*/
@Override
public Consumed<K, V> withName(final String processorName) {
return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName);
return new Consumed<>(keySerde, valueSerde, timestampExtractor, legacyResetPolicy, resetPolicy, processorName);
}
@Override
@ -239,11 +329,12 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
return Objects.equals(keySerde, consumed.keySerde) &&
Objects.equals(valueSerde, consumed.valueSerde) &&
Objects.equals(timestampExtractor, consumed.timestampExtractor) &&
legacyResetPolicy == consumed.legacyResetPolicy &&
resetPolicy == consumed.resetPolicy;
}
@Override
public int hashCode() {
return Objects.hash(keySerde, valueSerde, timestampExtractor, resetPolicy);
return Objects.hash(keySerde, valueSerde, timestampExtractor, legacyResetPolicy, resetPolicy);
}
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.TimestampExtractor;
@ -28,11 +29,10 @@ public class ConsumedInternal<K, V> extends Consumed<K, V> {
super(consumed);
}
public ConsumedInternal(final Serde<K> keySerde,
final Serde<V> valueSerde,
final TimestampExtractor timestampExtractor,
final Topology.AutoOffsetReset offsetReset) {
final AutoOffsetResetInternal offsetReset) {
this(Consumed.with(keySerde, valueSerde, timestampExtractor, offsetReset));
}
@ -60,8 +60,14 @@ public class ConsumedInternal<K, V> extends Consumed<K, V> {
return timestampExtractor;
}
public Topology.AutoOffsetReset offsetResetPolicy() {
return resetPolicy;
public AutoOffsetResetInternal offsetResetPolicy() {
return resetPolicy == null ? null : new AutoOffsetResetInternal(resetPolicy);
}
@SuppressWarnings("deprecation")
// TODO mjsax remove
public Topology.AutoOffsetReset legacyOffsetResetPolicy() {
return legacyResetPolicy;
}
public String name() {

View File

@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.Topology.AutoOffsetReset;
import org.apache.kafka.streams.AutoOffsetReset;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
@ -73,14 +73,16 @@ public class StreamSourceNode<K, V> extends SourceGraphNode<K, V> {
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
if (topicPattern().isPresent()) {
topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
// TODO mjsax
topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(),
nodeName(),
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),
consumedInternal().valueDeserializer(),
topicPattern().get());
} else {
topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
// TODO mjsax
topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(),
nodeName(),
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),

View File

@ -96,7 +96,8 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K, V> {
false
);
} else {
topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
// TODO mjsax
topologyBuilder.addSource(consumedInternal().legacyOffsetResetPolicy(),
sourceName,
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),

View File

@ -438,6 +438,7 @@ public class InternalTopologyBuilder {
return this;
}
@SuppressWarnings("deprecation")
public final void addSource(final Topology.AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
@ -465,6 +466,7 @@ public class InternalTopologyBuilder {
nodeGroups = null;
}
@SuppressWarnings("deprecation")
public final void addSource(final Topology.AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
@ -915,6 +917,7 @@ public class InternalTopologyBuilder {
}
@SuppressWarnings("deprecation")
private <T> void maybeAddToResetList(final Collection<T> earliestResets,
final Collection<T> latestResets,
final Topology.AutoOffsetReset offsetReset,

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class AutoOffsetResetTest {
@Test
void latestShouldReturnAnEmptyDuration() {
final AutoOffsetResetInternal latest = new AutoOffsetResetInternal(AutoOffsetReset.latest());
assertTrue(latest.duration().isEmpty(), "Latest should have an empty duration.");
}
@Test
void earliestShouldReturnAnEmptyDuration() {
final AutoOffsetResetInternal earliest = new AutoOffsetResetInternal(AutoOffsetReset.earliest());
assertTrue(earliest.duration().isEmpty(), "Earliest should have an empty duration.");
}
@Test
void customDurationShouldMatchExpectedValue() {
final Duration duration = Duration.ofSeconds(10L);
final AutoOffsetResetInternal custom = new AutoOffsetResetInternal(AutoOffsetReset.byDuration(duration));
assertEquals(10L, custom.duration().get().toSeconds(), "Duration should match the specified value in milliseconds.");
}
@Test
void shouldThrowExceptionIfDurationIsNegative() {
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> AutoOffsetReset.byDuration(Duration.ofSeconds(-1)),
"Creating an AutoOffsetReset with a negative duration should throw an IllegalArgumentException."
);
assertEquals("Duration cannot be negative", exception.getMessage(), "Exception message should indicate the duration cannot be negative.");
}
@Test
void twoInstancesCreatedAtTheSameTimeWithSameOptionsShouldBeEqual() {
final AutoOffsetReset latest1 = AutoOffsetReset.latest();
final AutoOffsetReset latest2 = AutoOffsetReset.latest();
final AutoOffsetReset earliest1 = AutoOffsetReset.earliest();
final AutoOffsetReset earliest2 = AutoOffsetReset.earliest();
final AutoOffsetReset custom1 = AutoOffsetReset.byDuration(Duration.ofSeconds(5));
final AutoOffsetReset custom2 = AutoOffsetReset.byDuration(Duration.ofSeconds(5));
final AutoOffsetReset customDifferent = AutoOffsetReset.byDuration(Duration.ofSeconds(10));
// Equals
assertEquals(latest1, latest2, "Two latest instances should be equal.");
assertEquals(earliest1, earliest2, "Two earliest instances should be equal.");
assertEquals(custom1, custom2, "Two custom instances with the same duration should be equal.");
assertNotEquals(latest1, earliest1, "Latest and earliest should not be equal.");
assertNotEquals(custom1, customDifferent, "Custom instances with different durations should not be equal.");
// HashCode
assertEquals(latest1.hashCode(), latest2.hashCode(), "HashCode for equal instances should be the same.");
assertEquals(custom1.hashCode(), custom2.hashCode(), "HashCode for equal custom instances should be the same.");
assertNotEquals(custom1.hashCode(), customDifferent.hashCode(), "HashCode for different custom instances should not match.");
}
}

View File

@ -2257,7 +2257,7 @@ public class TopologyTest {
private TopologyDescription.Source addSource(final String sourceName,
final String... sourceTopic) {
topology.addSource(null, sourceName, null, null, null, sourceTopic);
topology.addSource((Topology.AutoOffsetReset) null, sourceName, null, null, null, sourceTopic);
final StringBuilder allSourceTopics = new StringBuilder(sourceTopic[0]);
for (int i = 1; i < sourceTopic.length; ++i) {
allSourceTopics.append(", ").append(sourceTopic[i]);
@ -2267,7 +2267,7 @@ public class TopologyTest {
private TopologyDescription.Source addSource(final String sourceName,
final Pattern sourcePattern) {
topology.addSource(null, sourceName, null, null, null, sourcePattern);
topology.addSource((Topology.AutoOffsetReset) null, sourceName, null, null, null, sourcePattern);
return new InternalTopologyBuilder.Source(sourceName, null, sourcePattern);
}

View File

@ -18,7 +18,7 @@ package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream.{Consumed => ConsumedJ}
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.{AutoOffsetReset, Topology}
import org.apache.kafka.streams.processor.TimestampExtractor
object Consumed {
@ -36,12 +36,32 @@ object Consumed {
* @param valueSerde the value serde to use.
* @return a new instance of [[Consumed]]
*/
@deprecated("Use `with` method that accepts `AutoOffsetReset` instead", "4.0.0")
def `with`[K, V](
timestampExtractor: TimestampExtractor,
resetPolicy: Topology.AutoOffsetReset
)(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy)
/**
* Create an instance of [[Consumed]] with the supplied arguments. `null` values are acceptable.
*
* @tparam K key type
* @tparam V value type
* @param timestampExtractor the timestamp extractor to used. If `null` the default timestamp extractor from
* config will be used
* @param resetPolicy the offset reset policy to be used. If `null` the default reset policy from config
* will be used
* @param keySerde the key serde to use.
* @param valueSerde the value serde to use.
* @return a new instance of [[Consumed]]
*/
def `with`[K, V](
timestampExtractor: TimestampExtractor,
resetPolicy: AutoOffsetReset
)(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy)
/**
* Create an instance of [[Consumed]] with key and value Serdes.
*
@ -74,8 +94,22 @@ object Consumed {
* @param resetPolicy the offset reset policy to be used. If `null` the default reset policy from config will be used
* @return a new instance of [[Consumed]]
*/
@deprecated("Use `with` method that accepts `AutoOffsetReset` instead", "4.0.0")
def `with`[K, V](
resetPolicy: Topology.AutoOffsetReset
)(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde)
/**
* Create an instance of [[Consumed]] with a `org.apache.kafka.streams.AutoOffsetReset`.
*
* @tparam K key type
* @tparam V value type
* @param resetPolicy the offset reset policy to be used. If `null` the default reset policy from config will be used
* @return a new instance of [[Consumed]]
*/
def `with`[K, V](
resetPolicy: AutoOffsetReset
)(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde)
}

View File

@ -16,7 +16,8 @@
*/
package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.Topology
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy
import org.apache.kafka.streams.AutoOffsetReset
import org.apache.kafka.streams.kstream.internals.ConsumedInternal
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp
import org.apache.kafka.streams.scala.serialization.Serdes
@ -38,15 +39,15 @@ class ConsumedTest {
@Test
def testCreateConsumedWithTimestampExtractorAndResetPolicy(): Unit = {
val timestampExtractor = new FailOnInvalidTimestamp()
val resetPolicy = Topology.AutoOffsetReset.LATEST
val resetPolicy = AutoOffsetReset.latest()
val consumed: Consumed[String, Long] =
Consumed.`with`[String, Long](timestampExtractor, resetPolicy)
Consumed.`with`(timestampExtractor, resetPolicy)
val internalConsumed = new ConsumedInternal(consumed)
assertEquals(Serdes.stringSerde.getClass, internalConsumed.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalConsumed.valueSerde.getClass)
assertEquals(timestampExtractor, internalConsumed.timestampExtractor)
assertEquals(resetPolicy, internalConsumed.offsetResetPolicy)
assertEquals(AutoOffsetResetStrategy.StrategyType.LATEST, internalConsumed.offsetResetPolicy.offsetResetStrategy())
}
@Test
@ -59,14 +60,15 @@ class ConsumedTest {
assertEquals(Serdes.longSerde.getClass, internalConsumed.valueSerde.getClass)
assertEquals(timestampExtractor, internalConsumed.timestampExtractor)
}
@Test
def testCreateConsumedWithResetPolicy(): Unit = {
val resetPolicy = Topology.AutoOffsetReset.LATEST
val resetPolicy = AutoOffsetReset.latest()
val consumed: Consumed[String, Long] = Consumed.`with`[String, Long](resetPolicy)
val internalConsumed = new ConsumedInternal(consumed)
assertEquals(Serdes.stringSerde.getClass, internalConsumed.keySerde.getClass)
assertEquals(Serdes.longSerde.getClass, internalConsumed.valueSerde.getClass)
assertEquals(resetPolicy, internalConsumed.offsetResetPolicy)
assertEquals(AutoOffsetResetStrategy.StrategyType.LATEST, internalConsumed.offsetResetPolicy.offsetResetStrategy())
}
}