From 05696037d330e5e355d62cb1b977bac9df78db82 Mon Sep 17 00:00:00 2001 From: Arnav Dadarya <37542991+ardada2468@users.noreply.github.com> Date: Sat, 28 Sep 2024 18:21:26 -0400 Subject: [PATCH] KAFKA-12823 Remove Deprecated method KStream#through (#16761) Implements KIP-1087 Reviewers: Matthias J. Sax , Lucas Brutschy , Anna Sophie Blee-Goldman --- .../scala/kafka/tools/StreamsResetter.java | 1 - .../developer-guide/app-reset-tool.html | 17 +---- .../apache/kafka/streams/kstream/KStream.java | 40 ------------ .../kstream/internals/KStreamImpl.java | 34 ---------- .../kafka/streams/StreamsBuilderTest.java | 22 ------- .../kstream/internals/KStreamImplTest.java | 65 ------------------- .../internals/graph/StreamsGraphTest.java | 30 --------- .../kafka/streams/scala/kstream/KStream.scala | 34 ---------- .../apache/kafka/tools/StreamsResetter.java | 7 +- 9 files changed, 5 insertions(+), 245 deletions(-) diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 40798c3bba9..1eb362286b7 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -16,7 +16,6 @@ */ package kafka.tools; - import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; diff --git a/docs/streams/developer-guide/app-reset-tool.html b/docs/streams/developer-guide/app-reset-tool.html index 0951b3b4f61..bf157e38669 100644 --- a/docs/streams/developer-guide/app-reset-tool.html +++ b/docs/streams/developer-guide/app-reset-tool.html @@ -36,12 +36,11 @@

You can reset an application and force it to reprocess its data from scratch by using the application reset tool. This can be useful for development and testing, or when fixing bugs.

The application reset tool handles the Kafka Streams user topics (input, - output, and intermediate topics) and internal topics differently + and output) and internal topics differently when resetting the application.

Here’s what the application reset tool does for each topic type:

  • Input topics: Reset offsets to specified position (by default to the beginning of the topic).
  • -
  • Intermediate topics: Skip to the end of the topic, i.e., set the application’s committed consumer offsets for all partitions to each partition’s logSize (for consumer group application.id).
  • Internal topics: Delete the internal topic (this automatically deletes any committed offsets).

The application reset tool does not:

@@ -61,16 +60,6 @@
  • Use this tool with care and double-check its parameters: If you provide wrong parameter values (e.g., typos in application.id) or specify parameters inconsistently (e.g., specify the wrong input topics for the application), this tool might invalidate the application’s state or even impact other applications, consumer groups, or your Kafka topics.

  • -
  • You should manually delete and re-create any intermediate topics before running the application reset tool. This will free up disk space in Kafka brokers.

    -
  • -
  • You should delete and recreate intermediate topics before running the application reset tool, unless the following applies:

    -
    -
      -
    • You have external downstream consumers for the application’s intermediate topics.
    • -
    • You are in a development environment where manually deleting and re-creating intermediate topics is unnecessary.
    • -
    -
    -
  • @@ -106,10 +95,6 @@ topics. For these topics, the tool will reset the offset to the earliest available offset. ---intermediate-topics <String: list> Comma-separated list of intermediate user - topics (topics used in the through() - method). For these topics, the tool - will skip to the end. --internal-topics <String: list> Comma-separated list of internal topics to delete. Must be a subset of the internal topics marked for deletion by diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 4db21b9a157..7a3ab8394b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -797,46 +797,6 @@ public interface KStream { */ KStream merge(final KStream stream, final Named named); - /** - * Materialize this stream to a topic and creates a new {@code KStream} from the topic using default serializers, - * deserializers, and producer's default partitioning strategy. - * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is - * started). - *

    - * This is similar to calling {@link #to(String) #to(someTopicName)} and - * {@link StreamsBuilder#stream(String) StreamsBuilder#stream(someTopicName)}. - * Note that {@code through()} uses a hard coded {@link org.apache.kafka.streams.processor.FailOnInvalidTimestamp - * timestamp extractor} and does not allow to customize it, to ensure correct timestamp propagation. - * - * @param topic the topic name - * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream} - * @deprecated since 2.6; use {@link #repartition()} instead - */ - // TODO: when removed, update `StreamsResetter` description of --intermediate-topics - @Deprecated - KStream through(final String topic); - - /** - * Materialize this stream to a topic and creates a new {@code KStream} from the topic using the - * {@link Produced} instance for configuration of the {@link Serde key serde}, {@link Serde value serde}, - * and {@link StreamPartitioner}. - * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is - * started). - *

    - * This is similar to calling {@link #to(String, Produced) to(someTopic, Produced.with(keySerde, valueSerde)} - * and {@link StreamsBuilder#stream(String, Consumed) StreamsBuilder#stream(someTopicName, Consumed.with(keySerde, valueSerde))}. - * Note that {@code through()} uses a hard coded {@link org.apache.kafka.streams.processor.FailOnInvalidTimestamp - * timestamp extractor} and does not allow to customize it, to ensure correct timestamp propagation. - * - * @param topic the topic name - * @param produced the options to use when producing to the topic - * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream} - * @deprecated since 2.6; use {@link #repartition(Repartitioned)} instead - */ - @Deprecated - KStream through(final String topic, - final Produced produced); - /** * Materialize this stream to an auto-generated repartition topic and create a new {@code KStream} * from the auto-generated topic using default serializers, deserializers, and producer's default partitioning strategy. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index ec13a40bda1..380ee43f451 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -55,7 +55,6 @@ import org.apache.kafka.streams.kstream.internals.graph.StreamTableJoinNode; import org.apache.kafka.streams.kstream.internals.graph.StreamToTableNode; import org.apache.kafka.streams.kstream.internals.graph.UnoptimizableRepartitionNode; import org.apache.kafka.streams.kstream.internals.graph.UnoptimizableRepartitionNode.UnoptimizableRepartitionNodeBuilder; -import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; @@ -498,39 +497,6 @@ public class KStreamImpl extends AbstractStream implements KStream through(final String topic) { - return through(topic, Produced.with(keySerde, valueSerde, null)); - } - - @Deprecated - @Override - public KStream through(final String topic, - final Produced produced) { - Objects.requireNonNull(topic, "topic can't be null"); - Objects.requireNonNull(produced, "produced can't be null"); - - final ProducedInternal producedInternal = new ProducedInternal<>(produced); - if (producedInternal.keySerde() == null) { - producedInternal.withKeySerde(keySerde); - } - if (producedInternal.valueSerde() == null) { - producedInternal.withValueSerde(valueSerde); - } - to(topic, producedInternal); - - return builder.stream( - Collections.singleton(topic), - new ConsumedInternal<>( - producedInternal.keySerde(), - producedInternal.valueSerde(), - new FailOnInvalidTimestamp(), - null - ) - ); - } - @Override public KStream repartition() { return doRepartition(Repartitioned.as(null)); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index e3dccba6c51..72a56fc8a73 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -332,28 +332,6 @@ public class StreamsBuilderTest { processorSupplier.theCapturedProcessor().processed()); } - @Deprecated - @Test - public void shouldProcessViaThroughTopic() { - final KStream source = builder.stream("topic-source"); - final KStream through = source.through("topic-sink"); - - final MockApiProcessorSupplier sourceProcessorSupplier = new MockApiProcessorSupplier<>(); - source.process(sourceProcessorSupplier); - - final MockApiProcessorSupplier throughProcessorSupplier = new MockApiProcessorSupplier<>(); - through.process(throughProcessorSupplier); - - try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { - final TestInputTopic inputTopic = - driver.createInputTopic("topic-source", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); - inputTopic.pipeInput("A", "aa"); - } - - assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), sourceProcessorSupplier.theCapturedProcessor().processed()); - assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), throughProcessorSupplier.theCapturedProcessor().processed()); - } - @Test public void shouldProcessViaRepartitionTopic() { final KStream source = builder.stream("topic-source"); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index e2c08ebac0c..c5767e62ff8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -515,33 +515,6 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("named can't be null")); } - @Deprecated // specifically testing the deprecated variant - @Test - public void shouldNotAllowNullTopicOnThrough() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.through(null)); - assertThat(exception.getMessage(), equalTo("topic can't be null")); - } - - @Deprecated // specifically testing the deprecated variant - @Test - public void shouldNotAllowNullTopicOnThroughWithProduced() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.through(null, Produced.as("through"))); - assertThat(exception.getMessage(), equalTo("topic can't be null")); - } - - @Deprecated // specifically testing the deprecated variant - @Test - public void shouldNotAllowNullProducedOnThrough() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.through("topic", null)); - assertThat(exception.getMessage(), equalTo("produced can't be null")); - } - @Test public void shouldNotAllowNullTopicOnTo() { final NullPointerException exception = assertThrows( @@ -1277,10 +1250,6 @@ public class KStreamImplTest { assertNull(((AbstractStream) stream1.merge(stream1)).keySerde()); assertNull(((AbstractStream) stream1.merge(stream1)).valueSerde()); - assertEquals(((AbstractStream) stream1.through("topic-3")).keySerde(), consumedInternal.keySerde()); - assertEquals(((AbstractStream) stream1.through("topic-3")).valueSerde(), consumedInternal.valueSerde()); - assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).keySerde(), mySerde); - assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals(((AbstractStream) stream1.repartition()).keySerde(), consumedInternal.keySerde()); assertEquals(((AbstractStream) stream1.repartition()).valueSerde(), consumedInternal.valueSerde()); @@ -1329,24 +1298,6 @@ public class KStreamImplTest { assertNull(((AbstractStream) stream1.leftJoin(table2, selector, joiner)).valueSerde()); } - @Deprecated - @Test - public void shouldUseRecordMetadataTimestampExtractorWithThrough() { - final StreamsBuilder builder = new StreamsBuilder(); - final KStream stream1 = builder.stream(Arrays.asList("topic-1", "topic-2"), stringConsumed); - final KStream stream2 = builder.stream(Arrays.asList("topic-3", "topic-4"), stringConsumed); - - stream1.to("topic-5"); - stream2.through("topic-6"); - - final ProcessorTopology processorTopology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").buildTopology(); - assertThat(processorTopology.source("topic-6").timestampExtractor(), instanceOf(FailOnInvalidTimestamp.class)); - assertNull(processorTopology.source("topic-4").timestampExtractor()); - assertNull(processorTopology.source("topic-3").timestampExtractor()); - assertNull(processorTopology.source("topic-2").timestampExtractor()); - assertNull(processorTopology.source("topic-1").timestampExtractor()); - } - @Test public void shouldUseRecordMetadataTimestampExtractorWithRepartition() { final StreamsBuilder builder = new StreamsBuilder(); @@ -1364,22 +1315,6 @@ public class KStreamImplTest { assertNull(processorTopology.source("topic-1").timestampExtractor()); } - @Deprecated - @Test - public void shouldSendDataThroughTopicUsingProduced() { - final StreamsBuilder builder = new StreamsBuilder(); - final String input = "topic"; - final KStream stream = builder.stream(input, stringConsumed); - stream.through("through-topic", Produced.with(Serdes.String(), Serdes.String())).process(processorSupplier); - - try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { - final TestInputTopic inputTopic = - driver.createInputTopic(input, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); - inputTopic.pipeInput("a", "b"); - } - assertThat(processorSupplier.theCapturedProcessor().processed(), equalTo(Collections.singletonList(new KeyValueTimestamp<>("a", "b", 0)))); - } - @Test public void shouldSendDataThroughRepartitionTopicUsingRepartitioned() { final StreamsBuilder builder = new StreamsBuilder(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java index 79ea95118f4..e046ba19533 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java @@ -195,19 +195,6 @@ public class StreamsGraphTest { assertEquals(2, getCountOfRepartitionTopicsFound(noOptimization.describe().toString())); } - // no need to optimize as user has already performed the repartitioning manually - @Deprecated - @Test - public void shouldNotOptimizeWhenAThroughOperationIsDone() { - final Topology attemptedOptimize = getTopologyWithThroughOperation(StreamsConfig.OPTIMIZE); - final Topology noOptimization = getTopologyWithThroughOperation(StreamsConfig.NO_OPTIMIZATION); - - assertEquals(attemptedOptimize.describe().toString(), noOptimization.describe().toString()); - assertEquals(0, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString())); - assertEquals(0, getCountOfRepartitionTopicsFound(noOptimization.describe().toString())); - - } - @Test public void shouldOptimizeSeveralMergeNodesWithCommonKeyChangingParent() { final StreamsBuilder streamsBuilder = new StreamsBuilder(); @@ -256,23 +243,6 @@ public class StreamsGraphTest { } - @Deprecated // specifically testing the deprecated variant - private Topology getTopologyWithThroughOperation(final String optimizeConfig) { - - final StreamsBuilder builder = new StreamsBuilder(); - final Properties properties = new Properties(); - properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizeConfig); - - final KStream inputStream = builder.stream("input"); - final KStream mappedKeyStream = inputStream.selectKey((k, v) -> k + v).through("through-topic"); - - mappedKeyStream.groupByKey().count().toStream().to("output"); - mappedKeyStream.groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(5000))).count().toStream().to("windowed-output"); - - return builder.build(properties); - - } - private Topology getTopologyWithRepartitionOperation(final String optimizeConfig) { final StreamsBuilder builder = new StreamsBuilder(); final Properties properties = new Properties(); diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index 5886482a5f0..311def571d8 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -349,40 +349,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { def split(named: Named): BranchedKStream[K, V] = new BranchedKStream(inner.split(named)) - /** - * Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Produced` instance for - * configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner` - *

    - * The user can either supply the `Produced` instance as an implicit in scope or they can also provide implicit - * key and value serdes that will be converted to a `Produced` instance implicitly. - *

    - * {{{ - * Example: - * - * // brings implicit serdes in scope - * import Serdes._ - * - * //.. - * val clicksPerRegion: KStream[String, Long] = //.. - * - * // Implicit serdes in scope will generate an implicit Produced instance, which - * // will be passed automatically to the call of through below - * clicksPerRegion.through(topic) - * - * // Similarly you can create an implicit Produced and it will be passed implicitly - * // to the through call - * }}} - * - * @param topic the topic name - * @param produced the instance of Produced that gives the serdes and `StreamPartitioner` - * @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]] - * @see `org.apache.kafka.streams.kstream.KStream#through` - * @deprecated use `repartition()` instead - */ - @deprecated("use `repartition()` instead", "2.6.0") - def through(topic: String)(implicit produced: Produced[K, V]): KStream[K, V] = - new KStream(inner.through(topic, produced)) - /** * Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Repartitioned` instance * for configuration of the `Serde key serde`, `Serde value serde`, `StreamPartitioner`, number of partitions, and diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java index a4894e22875..526d9661928 100644 --- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java +++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java @@ -59,6 +59,7 @@ import joptsimple.OptionException; import joptsimple.OptionSpec; import joptsimple.OptionSpecBuilder; + /** * {@link StreamsResetter} resets the processing state of a Kafka Streams application so that, for example, * you can reprocess its input from scratch. @@ -93,7 +94,6 @@ public class StreamsResetter { private static final String USAGE = "This tool helps to quickly reset an application in order to reprocess " + "its data from scratch.\n" + "* This tool resets offsets of input topics to the earliest available offset (by default), or to a specific defined position" - + " and it skips to the end of intermediate topics (topics that are input and output topics, e.g., used by deprecated through() method).\n" + "* This tool deletes the internal topics that were created by Kafka Streams (topics starting with " + "\"-\").\n" + "The tool finds these internal topics automatically. If the topics flagged automatically for deletion by " @@ -577,8 +577,8 @@ public class StreamsResetter { .ofType(String.class) .withValuesSeparatedBy(',') .describedAs("list"); - intermediateTopicsOption = parser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics that are input and output topics, " - + "e.g., used in the deprecated through() method). For these topics, the tool will skip to the end.") + intermediateTopicsOption = parser.accepts("intermediate-topics", "[deprecated] Comma-separated list of intermediate user topics (topics that are input and output topics). " + + "For these topics, the tool will skip to the end.") .withRequiredArg() .ofType(String.class) .withValuesSeparatedBy(',') @@ -670,6 +670,7 @@ public class StreamsResetter { } public List intermediateTopicsOption() { + System.out.println("intermediateTopicsOption is deprecated and will be removed in a future release"); return options.valuesOf(intermediateTopicsOption); }