KAFKA-6966: Extend TopologyDescription to better represent Source and (#5284)

Implements KIP-321

Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>
This commit is contained in:
nprad 2018-08-10 18:12:03 -05:00 committed by Matthias J. Sax
parent be43e2330e
commit 79a2f892ca
4 changed files with 121 additions and 22 deletions

View File

@ -90,6 +90,14 @@
We have also removed some public APIs that are deprecated prior to 1.0.x in 2.0.0.
See below for a detailed list of removed APIs.
</p>
<h3><a id="streams_api_changes_210" href="#streams_api_changes_210">Streams API changes in 2.1.0</a></h3>
<p>
We updated <code>TopologyDescription</code> API to allow for better runtime checking.
Users are encouraged to use <code>#topicSet()</code> and <code>#topicPattern()</code> accordingly on <code>TopologyDescription.Source</code> nodes,
instead of using <code>#topics()</code>, which has since been deprecated. Similarly, use <code>#topic()</code> and <code>#topicNameExtractor()</code>
to get descriptions of <code>TopologyDescription.Sink</code> nodes. For more details, see
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes">KIP-321</a>.
</p>
<h3><a id="streams_api_changes_200" href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>
<p>

View File

@ -16,9 +16,11 @@
*/
package org.apache.kafka.streams;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.StreamTask;
import java.util.Set;
import java.util.regex.Pattern;
/**
* A meta representation of a {@link Topology topology}.
@ -113,8 +115,22 @@ public interface TopologyDescription {
/**
* The topic names this source node is reading from.
* @return comma separated list of topic names or pattern (as String)
* @deprecated use {@link #topicSet()} or {@link #topicPattern()} instead
*/
@Deprecated
String topics();
/**
* The topic names this source node is reading from.
* @return a set of topic names
*/
Set<String> topicSet();
/**
* The pattern used to match topic names that is reading from.
* @return the pattern used to match topic names
*/
Pattern topicPattern();
}
/**
@ -134,10 +150,17 @@ public interface TopologyDescription {
interface Sink extends Node {
/**
* The topic name this sink node is writing to.
* Could be null if the topic name can only be dynamically determined based on {@code TopicNameExtractor}
* Could be {@code null} if the topic name can only be dynamically determined based on {@link TopicNameExtractor}
* @return a topic name
*/
String topic();
/**
* The {@link TopicNameExtractor} class that this sink node uses to dynamically extract the topic name to write to.
* Could be {@code null} if the topic name is not dynamically determined.
* @return the {@link TopicNameExtractor} class used get the topic name
*/
TopicNameExtractor topicNameExtractor();
}
/**

View File

@ -282,15 +282,7 @@ public class InternalTopologyBuilder {
@Override
Source describe() {
final String sourceTopics;
if (pattern == null) {
sourceTopics = topics.toString();
} else {
sourceTopics = pattern.toString();
}
return new Source(name, sourceTopics);
return new Source(name, new HashSet<>(topics), pattern);
}
}
@ -1337,7 +1329,7 @@ public class InternalTopologyBuilder {
final String storeName,
final String topicName,
final int id) {
source = new Source(sourceName, topicName);
source = new Source(sourceName, Collections.singleton(topicName), null);
processor = new Processor(processorName, Collections.singleton(storeName));
source.successors.add(processor);
processor.predecessors.add(source);
@ -1424,19 +1416,33 @@ public class InternalTopologyBuilder {
}
public final static class Source extends AbstractNode implements TopologyDescription.Source {
private final String topics;
private final Set<String> topics;
private final Pattern topicPattern;
public Source(final String name,
final String topics) {
final Set<String> topics,
final Pattern pattern) {
super(name);
this.topics = topics;
this.topicPattern = pattern;
}
@Deprecated
@Override
public String topics() {
return topics.toString();
}
@Override
public String topics() {
public Set<String> topicSet() {
return topics;
}
@Override
public Pattern topicPattern() {
return topicPattern;
}
@Override
public void addPredecessor(final TopologyDescription.Node predecessor) {
throw new UnsupportedOperationException("Sources don't have predecessors.");
@ -1444,7 +1450,9 @@ public class InternalTopologyBuilder {
@Override
public String toString() {
return "Source: " + name + " (topics: " + topics + ")\n --> " + nodeNames(successors);
final String topicsString = topics == null ? topicPattern.toString() : topics.toString();
return "Source: " + name + " (topics: " + topicsString + ")\n --> " + nodeNames(successors);
}
@Override
@ -1459,13 +1467,14 @@ public class InternalTopologyBuilder {
final Source source = (Source) o;
// omit successor to avoid infinite loops
return name.equals(source.name)
&& topics.equals(source.topics);
&& topics.equals(source.topics)
&& topicPattern.equals(source.topicPattern);
}
@Override
public int hashCode() {
// omit successor as it might change and alter the hash code
return Objects.hash(name, topics);
return Objects.hash(name, topics, topicPattern);
}
}
@ -1528,10 +1537,20 @@ public class InternalTopologyBuilder {
@Override
public String topic() {
if (topicNameExtractor instanceof StaticTopicNameExtractor)
if (topicNameExtractor instanceof StaticTopicNameExtractor) {
return ((StaticTopicNameExtractor) topicNameExtractor).topicName;
else
} else {
return null;
}
}
@Override
public TopicNameExtractor topicNameExtractor() {
if (topicNameExtractor instanceof StaticTopicNameExtractor) {
return null;
} else {
return topicNameExtractor;
}
}
@Override
@ -1541,7 +1560,10 @@ public class InternalTopologyBuilder {
@Override
public String toString() {
return "Sink: " + name + " (topic: " + topic() + ")\n <-- " + nodeNames(predecessors);
if (topicNameExtractor instanceof StaticTopicNameExtractor) {
return "Sink: " + name + " (topic: " + topic() + ")\n <-- " + nodeNames(predecessors);
}
return "Sink: " + name + " (extractor class: " + topicNameExtractor + ")\n <-- " + nodeNames(predecessors);
}
@Override

View File

@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.RecordContext;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
@ -370,6 +371,23 @@ public class TopologyTest {
assertThat(topology.describe(), equalTo(expectedDescription));
}
@Test
public void sinkShouldReturnNullTopicWithDynamicRouting() {
final TopologyDescription.Sink expectedSinkNode
= new InternalTopologyBuilder.Sink("sink", (key, value, record) -> record.topic() + "-" + key);
assertThat(expectedSinkNode.topic(), equalTo(null));
}
@Test
public void sinkShouldReturnTopicNameExtractorWithDynamicRouting() {
final TopicNameExtractor topicNameExtractor = (key, value, record) -> record.topic() + "-" + key;
final TopologyDescription.Sink expectedSinkNode
= new InternalTopologyBuilder.Sink("sink", topicNameExtractor);
assertThat(expectedSinkNode.topicNameExtractor(), equalTo(topicNameExtractor));
}
@Test
public void singleSourceShouldHaveSingleSubtopology() {
final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
@ -629,6 +647,34 @@ public class TopologyTest {
assertThat(topology.describe(), equalTo(expectedDescription));
}
@Test
public void topologyWithDynamicRoutingShouldDescribeExtractorClass() {
final StreamsBuilder builder = new StreamsBuilder();
final TopicNameExtractor topicNameExtractor = new TopicNameExtractor() {
@Override
public String extract(final Object key, final Object value, final RecordContext recordContext) {
return recordContext.topic() + "-" + key;
}
@Override
public String toString() {
return "anonymous topic name extractor. topic is [recordContext.topic()]-[key]";
}
};
builder.stream("input-topic").to(topicNameExtractor);
final TopologyDescription describe = builder.build().describe();
assertEquals(
"Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" +
" --> KSTREAM-SINK-0000000001\n" +
" Sink: KSTREAM-SINK-0000000001 (extractor class: anonymous topic name extractor. topic is [recordContext.topic()]-[key])\n" +
" <-- KSTREAM-SOURCE-0000000000\n\n",
describe.toString());
}
@Test
public void kGroupedStreamZeroArgCountShouldPreserveTopologyStructure() {
final StreamsBuilder builder = new StreamsBuilder();
@ -1048,13 +1094,13 @@ public class TopologyTest {
for (int i = 1; i < sourceTopic.length; ++i) {
allSourceTopics.append(", ").append(sourceTopic[i]);
}
return new InternalTopologyBuilder.Source(sourceName, allSourceTopics.toString());
return new InternalTopologyBuilder.Source(sourceName, new HashSet<>(Arrays.asList(sourceTopic)), null);
}
private TopologyDescription.Source addSource(final String sourceName,
final Pattern sourcePattern) {
topology.addSource(null, sourceName, null, null, null, sourcePattern);
return new InternalTopologyBuilder.Source(sourceName, sourcePattern.toString());
return new InternalTopologyBuilder.Source(sourceName, null, sourcePattern);
}
private TopologyDescription.Processor addProcessor(final String processorName,