From b0cfd1f4ca83e1bc3a324e5d4081b2731c420733 Mon Sep 17 00:00:00 2001 From: Geordie Date: Fri, 25 Jun 2021 00:07:22 +0800 Subject: [PATCH] =?UTF-8?q?KAFKA-12336=20Custom=20stream=20naming=20does?= =?UTF-8?q?=20not=20work=20while=20calling=20stream[K=E2=80=A6=20(#10190)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Custom stream naming does not work while calling stream[K, V](topicPattern: Pattern) Reviewers: Bill Bejeck --- .../internals/InternalStreamsBuilder.java | 2 +- .../streams/scala/kstream/KStreamTest.scala | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index fbfd9e173a4..03ee8bbff08 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -102,7 +102,7 @@ public class InternalStreamsBuilder implements InternalNameProvider { public KStream stream(final Pattern topicPattern, final ConsumedInternal consumed) { - final String name = newProcessorName(KStreamImpl.SOURCE_NAME); + final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME); final StreamSourceNode streamPatternSourceNode = new StreamSourceNode<>(name, topicPattern, consumed); addGraphNode(root, streamPatternSourceNode); diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala index 2df4e5801c0..1d8a1f1bd02 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala @@ -18,6 +18,7 @@ package org.apache.kafka.streams.scala.kstream import java.time.Duration.ofSeconds import java.time.Instant +import java.util.regex.Pattern import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.kstream.{ @@ -463,4 +464,23 @@ class KStreamTest extends TestDriver { val transformNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.toList(1) assertEquals("my-name", transformNode.name()) } + + @Test + def testSettingNameOnStream(): Unit = { + val builder = new StreamsBuilder() + val topicsPattern = "t-[A-Za-z0-9-].suffix" + val sinkTopic = "sink" + + builder + .stream[String, String](Pattern.compile(topicsPattern))( + Consumed.`with`[String, String].withName("my-fancy-name") + ) + .to(sinkTopic) + + import scala.jdk.CollectionConverters._ + + val streamNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.head + assertEquals("my-fancy-name", streamNode.name()) + } + }