From 79a2f892caef38f28c3ad837ad28b63dcbb58b87 Mon Sep 17 00:00:00 2001
From: nprad
Date: Fri, 10 Aug 2018 18:12:03 -0500
Subject: [PATCH] KAFKA-6966: Extend TopologyDescription to better represent
Source and (#5284)
Implements KIP-321
Reviewers: Matthias J. Sax , John Roesler , Bill Bejeck
---
docs/streams/upgrade-guide.html | 8 +++
.../kafka/streams/TopologyDescription.java | 25 +++++++-
.../internals/InternalTopologyBuilder.java | 60 +++++++++++++------
.../apache/kafka/streams/TopologyTest.java | 50 +++++++++++++++-
4 files changed, 121 insertions(+), 22 deletions(-)
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 34f66ce53fe..35e1f77fd4c 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -90,6 +90,14 @@
We have also removed some public APIs that are deprecated prior to 1.0.x in 2.0.0.
See below for a detailed list of removed APIs.
+
+
+ We updated TopologyDescription API to allow for better runtime checking.
+ Users are encouraged to use #topicSet() and #topicPattern() accordingly on TopologyDescription.Source nodes,
+ instead of using #topics(), which has since been deprecated. Similarly, use #topic() and #topicNameExtractor()
+ to get descriptions of TopologyDescription.Sink nodes. For more details, see
+ KIP-321.
+
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index 04a292f9a97..870052d7399 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -16,9 +16,11 @@
*/
package org.apache.kafka.streams;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.StreamTask;
import java.util.Set;
+import java.util.regex.Pattern;
/**
* A meta representation of a {@link Topology topology}.
@@ -113,8 +115,22 @@ public interface TopologyDescription {
/**
* The topic names this source node is reading from.
* @return comma separated list of topic names or pattern (as String)
+ * @deprecated use {@link #topicSet()} or {@link #topicPattern()} instead
*/
+ @Deprecated
String topics();
+
+ /**
+ * The topic names this source node is reading from.
+ * @return a set of topic names
+ */
+ Set topicSet();
+
+ /**
+ * The pattern used to match topic names that is reading from.
+ * @return the pattern used to match topic names
+ */
+ Pattern topicPattern();
}
/**
@@ -134,10 +150,17 @@ public interface TopologyDescription {
interface Sink extends Node {
/**
* The topic name this sink node is writing to.
- * Could be null if the topic name can only be dynamically determined based on {@code TopicNameExtractor}
+ * Could be {@code null} if the topic name can only be dynamically determined based on {@link TopicNameExtractor}
* @return a topic name
*/
String topic();
+
+ /**
+ * The {@link TopicNameExtractor} class that this sink node uses to dynamically extract the topic name to write to.
+ * Could be {@code null} if the topic name is not dynamically determined.
+ * @return the {@link TopicNameExtractor} class used get the topic name
+ */
+ TopicNameExtractor topicNameExtractor();
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index b7e4303cfb4..99d616f60e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -282,15 +282,7 @@ public class InternalTopologyBuilder {
@Override
Source describe() {
- final String sourceTopics;
-
- if (pattern == null) {
- sourceTopics = topics.toString();
- } else {
- sourceTopics = pattern.toString();
- }
-
- return new Source(name, sourceTopics);
+ return new Source(name, new HashSet<>(topics), pattern);
}
}
@@ -1337,7 +1329,7 @@ public class InternalTopologyBuilder {
final String storeName,
final String topicName,
final int id) {
- source = new Source(sourceName, topicName);
+ source = new Source(sourceName, Collections.singleton(topicName), null);
processor = new Processor(processorName, Collections.singleton(storeName));
source.successors.add(processor);
processor.predecessors.add(source);
@@ -1424,19 +1416,33 @@ public class InternalTopologyBuilder {
}
public final static class Source extends AbstractNode implements TopologyDescription.Source {
- private final String topics;
+ private final Set topics;
+ private final Pattern topicPattern;
public Source(final String name,
- final String topics) {
+ final Set topics,
+ final Pattern pattern) {
super(name);
this.topics = topics;
+ this.topicPattern = pattern;
+ }
+
+ @Deprecated
+ @Override
+ public String topics() {
+ return topics.toString();
}
@Override
- public String topics() {
+ public Set topicSet() {
return topics;
}
+ @Override
+ public Pattern topicPattern() {
+ return topicPattern;
+ }
+
@Override
public void addPredecessor(final TopologyDescription.Node predecessor) {
throw new UnsupportedOperationException("Sources don't have predecessors.");
@@ -1444,7 +1450,9 @@ public class InternalTopologyBuilder {
@Override
public String toString() {
- return "Source: " + name + " (topics: " + topics + ")\n --> " + nodeNames(successors);
+ final String topicsString = topics == null ? topicPattern.toString() : topics.toString();
+
+ return "Source: " + name + " (topics: " + topicsString + ")\n --> " + nodeNames(successors);
}
@Override
@@ -1459,13 +1467,14 @@ public class InternalTopologyBuilder {
final Source source = (Source) o;
// omit successor to avoid infinite loops
return name.equals(source.name)
- && topics.equals(source.topics);
+ && topics.equals(source.topics)
+ && topicPattern.equals(source.topicPattern);
}
@Override
public int hashCode() {
// omit successor as it might change and alter the hash code
- return Objects.hash(name, topics);
+ return Objects.hash(name, topics, topicPattern);
}
}
@@ -1528,10 +1537,20 @@ public class InternalTopologyBuilder {
@Override
public String topic() {
- if (topicNameExtractor instanceof StaticTopicNameExtractor)
+ if (topicNameExtractor instanceof StaticTopicNameExtractor) {
return ((StaticTopicNameExtractor) topicNameExtractor).topicName;
- else
+ } else {
return null;
+ }
+ }
+
+ @Override
+ public TopicNameExtractor topicNameExtractor() {
+ if (topicNameExtractor instanceof StaticTopicNameExtractor) {
+ return null;
+ } else {
+ return topicNameExtractor;
+ }
}
@Override
@@ -1541,7 +1560,10 @@ public class InternalTopologyBuilder {
@Override
public String toString() {
- return "Sink: " + name + " (topic: " + topic() + ")\n <-- " + nodeNames(predecessors);
+ if (topicNameExtractor instanceof StaticTopicNameExtractor) {
+ return "Sink: " + name + " (topic: " + topic() + ")\n <-- " + nodeNames(predecessors);
+ }
+ return "Sink: " + name + " (extractor class: " + topicNameExtractor + ")\n <-- " + nodeNames(predecessors);
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index ece157cd02e..eeb08ac1f95 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.RecordContext;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -370,6 +371,23 @@ public class TopologyTest {
assertThat(topology.describe(), equalTo(expectedDescription));
}
+ @Test
+ public void sinkShouldReturnNullTopicWithDynamicRouting() {
+ final TopologyDescription.Sink expectedSinkNode
+ = new InternalTopologyBuilder.Sink("sink", (key, value, record) -> record.topic() + "-" + key);
+
+ assertThat(expectedSinkNode.topic(), equalTo(null));
+ }
+
+ @Test
+ public void sinkShouldReturnTopicNameExtractorWithDynamicRouting() {
+ final TopicNameExtractor topicNameExtractor = (key, value, record) -> record.topic() + "-" + key;
+ final TopologyDescription.Sink expectedSinkNode
+ = new InternalTopologyBuilder.Sink("sink", topicNameExtractor);
+
+ assertThat(expectedSinkNode.topicNameExtractor(), equalTo(topicNameExtractor));
+ }
+
@Test
public void singleSourceShouldHaveSingleSubtopology() {
final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
@@ -629,6 +647,34 @@ public class TopologyTest {
assertThat(topology.describe(), equalTo(expectedDescription));
}
+ @Test
+ public void topologyWithDynamicRoutingShouldDescribeExtractorClass() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final TopicNameExtractor topicNameExtractor = new TopicNameExtractor() {
+ @Override
+ public String extract(final Object key, final Object value, final RecordContext recordContext) {
+ return recordContext.topic() + "-" + key;
+ }
+
+ @Override
+ public String toString() {
+ return "anonymous topic name extractor. topic is [recordContext.topic()]-[key]";
+ }
+ };
+ builder.stream("input-topic").to(topicNameExtractor);
+ final TopologyDescription describe = builder.build().describe();
+
+ assertEquals(
+ "Topologies:\n" +
+ " Sub-topology: 0\n" +
+ " Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" +
+ " --> KSTREAM-SINK-0000000001\n" +
+ " Sink: KSTREAM-SINK-0000000001 (extractor class: anonymous topic name extractor. topic is [recordContext.topic()]-[key])\n" +
+ " <-- KSTREAM-SOURCE-0000000000\n\n",
+ describe.toString());
+ }
+
@Test
public void kGroupedStreamZeroArgCountShouldPreserveTopologyStructure() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -1048,13 +1094,13 @@ public class TopologyTest {
for (int i = 1; i < sourceTopic.length; ++i) {
allSourceTopics.append(", ").append(sourceTopic[i]);
}
- return new InternalTopologyBuilder.Source(sourceName, allSourceTopics.toString());
+ return new InternalTopologyBuilder.Source(sourceName, new HashSet<>(Arrays.asList(sourceTopic)), null);
}
private TopologyDescription.Source addSource(final String sourceName,
final Pattern sourcePattern) {
topology.addSource(null, sourceName, null, null, null, sourcePattern);
- return new InternalTopologyBuilder.Source(sourceName, sourcePattern.toString());
+ return new InternalTopologyBuilder.Source(sourceName, null, sourcePattern);
}
private TopologyDescription.Processor addProcessor(final String processorName,