KAFKA-12336 Custom stream naming does not work while calling stream[K… (#10190)

Custom stream naming does not work while calling stream[K, V](topicPattern: Pattern)

Reviewers: Bill Bejeck <bbejeck@apache.org>
This commit is contained in:
Geordie 2021-06-25 00:07:22 +08:00 committed by GitHub
parent 574af88305
commit b0cfd1f4ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 1 deletions

View File

@ -102,7 +102,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
public <K, V> KStream<K, V> stream(final Pattern topicPattern, public <K, V> KStream<K, V> stream(final Pattern topicPattern,
final ConsumedInternal<K, V> consumed) { final ConsumedInternal<K, V> consumed) {
final String name = newProcessorName(KStreamImpl.SOURCE_NAME); final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME);
final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name, topicPattern, consumed); final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name, topicPattern, consumed);
addGraphNode(root, streamPatternSourceNode); addGraphNode(root, streamPatternSourceNode);

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.scala.kstream
import java.time.Duration.ofSeconds import java.time.Duration.ofSeconds
import java.time.Instant import java.time.Instant
import java.util.regex.Pattern
import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{ import org.apache.kafka.streams.kstream.{
@ -463,4 +464,23 @@ class KStreamTest extends TestDriver {
val transformNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.toList(1) val transformNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.toList(1)
assertEquals("my-name", transformNode.name()) assertEquals("my-name", transformNode.name())
} }
@Test
def testSettingNameOnStream(): Unit = {
val builder = new StreamsBuilder()
val topicsPattern = "t-[A-Za-z0-9-].suffix"
val sinkTopic = "sink"
builder
.stream[String, String](Pattern.compile(topicsPattern))(
Consumed.`with`[String, String].withName("my-fancy-name")
)
.to(sinkTopic)
import scala.jdk.CollectionConverters._
val streamNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.head
assertEquals("my-fancy-name", streamNode.name())
}
} }