mirror of https://github.com/apache/kafka.git
KAFKA-3395: prefix job id to internal topic names
guozhangwang Author: Yasuhiro Matsuda <yasuhiro@confluent.io> Reviewers: Guozhang Wang <wangguoz@gmail.com> Closes #1062 from ymatsuda/k3395
This commit is contained in:
parent
ac7b2e95d3
commit
c1a56c6839
|
@ -85,7 +85,7 @@ public class TopologyBuilder {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract ProcessorNode build();
|
public abstract ProcessorNode build(String jobId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ProcessorNodeFactory extends NodeFactory {
|
private static class ProcessorNodeFactory extends NodeFactory {
|
||||||
|
@ -105,7 +105,7 @@ public class TopologyBuilder {
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public ProcessorNode build() {
|
public ProcessorNode build(String jobId) {
|
||||||
return new ProcessorNode(name, supplier.get(), stateStoreNames);
|
return new ProcessorNode(name, supplier.get(), stateStoreNames);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -124,12 +124,12 @@ public class TopologyBuilder {
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public ProcessorNode build() {
|
public ProcessorNode build(String jobId) {
|
||||||
return new SourceNode(name, keyDeserializer, valDeserializer);
|
return new SourceNode(name, keyDeserializer, valDeserializer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class SinkNodeFactory extends NodeFactory {
|
private class SinkNodeFactory extends NodeFactory {
|
||||||
public final String[] parents;
|
public final String[] parents;
|
||||||
public final String topic;
|
public final String topic;
|
||||||
private Serializer keySerializer;
|
private Serializer keySerializer;
|
||||||
|
@ -147,8 +147,13 @@ public class TopologyBuilder {
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public ProcessorNode build() {
|
public ProcessorNode build(String jobId) {
|
||||||
return new SinkNode(name, topic, keySerializer, valSerializer, partitioner);
|
if (internalTopicNames.contains(topic)) {
|
||||||
|
// prefix the job id to the internal topic name
|
||||||
|
return new SinkNode(name, jobId + "-" + topic, keySerializer, valSerializer, partitioner);
|
||||||
|
} else {
|
||||||
|
return new SinkNode(name, topic, keySerializer, valSerializer, partitioner);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -491,7 +496,7 @@ public class TopologyBuilder {
|
||||||
*
|
*
|
||||||
* @return groups of topic names
|
* @return groups of topic names
|
||||||
*/
|
*/
|
||||||
public Map<Integer, TopicsInfo> topicGroups() {
|
public Map<Integer, TopicsInfo> topicGroups(String jobId) {
|
||||||
Map<Integer, TopicsInfo> topicGroups = new HashMap<>();
|
Map<Integer, TopicsInfo> topicGroups = new HashMap<>();
|
||||||
|
|
||||||
if (nodeGroups == null)
|
if (nodeGroups == null)
|
||||||
|
@ -506,27 +511,35 @@ public class TopologyBuilder {
|
||||||
// if the node is a source node, add to the source topics
|
// if the node is a source node, add to the source topics
|
||||||
String[] topics = nodeToSourceTopics.get(node);
|
String[] topics = nodeToSourceTopics.get(node);
|
||||||
if (topics != null) {
|
if (topics != null) {
|
||||||
sourceTopics.addAll(Arrays.asList(topics));
|
|
||||||
|
|
||||||
// if some of the topics are internal, add them to the internal topics
|
// if some of the topics are internal, add them to the internal topics
|
||||||
for (String topic : topics) {
|
for (String topic : topics) {
|
||||||
if (this.internalTopicNames.contains(topic))
|
if (this.internalTopicNames.contains(topic)) {
|
||||||
internalSourceTopics.add(topic);
|
// prefix the job id to the internal topic name
|
||||||
|
String internalTopic = jobId + "-" + topic;
|
||||||
|
internalSourceTopics.add(internalTopic);
|
||||||
|
sourceTopics.add(internalTopic);
|
||||||
|
} else {
|
||||||
|
sourceTopics.add(topic);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if the node is a sink node, add to the sink topics
|
// if the node is a sink node, add to the sink topics
|
||||||
String topic = nodeToSinkTopic.get(node);
|
String topic = nodeToSinkTopic.get(node);
|
||||||
if (topic != null)
|
if (topic != null) {
|
||||||
sinkTopics.add(topic);
|
if (internalTopicNames.contains(topic)) {
|
||||||
|
// prefix the job id to the change log topic name
|
||||||
|
sinkTopics.add(jobId + "-" + topic);
|
||||||
|
} else {
|
||||||
|
sinkTopics.add(topic);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// if the node is connected to a state, add to the state topics
|
// if the node is connected to a state, add to the state topics
|
||||||
for (StateStoreFactory stateFactory : stateFactories.values()) {
|
for (StateStoreFactory stateFactory : stateFactories.values()) {
|
||||||
|
|
||||||
// we store the changelog topic here without the job id prefix
|
|
||||||
// since it is within a single job and is only used for
|
|
||||||
if (stateFactory.isInternal && stateFactory.users.contains(node)) {
|
if (stateFactory.isInternal && stateFactory.users.contains(node)) {
|
||||||
stateChangelogTopics.add(stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
|
// prefix the job id to the change log topic name
|
||||||
|
stateChangelogTopics.add(jobId + "-" + stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -586,7 +599,7 @@ public class TopologyBuilder {
|
||||||
|
|
||||||
return nodeGroups;
|
return nodeGroups;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Asserts that the streams of the specified source nodes must be copartitioned.
|
* Asserts that the streams of the specified source nodes must be copartitioned.
|
||||||
*
|
*
|
||||||
|
@ -624,7 +637,7 @@ public class TopologyBuilder {
|
||||||
*
|
*
|
||||||
* @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
|
* @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
|
||||||
*/
|
*/
|
||||||
public ProcessorTopology build(Integer topicGroupId) {
|
public ProcessorTopology build(String jobId, Integer topicGroupId) {
|
||||||
Set<String> nodeGroup;
|
Set<String> nodeGroup;
|
||||||
if (topicGroupId != null) {
|
if (topicGroupId != null) {
|
||||||
nodeGroup = nodeGroups().get(topicGroupId);
|
nodeGroup = nodeGroups().get(topicGroupId);
|
||||||
|
@ -632,11 +645,11 @@ public class TopologyBuilder {
|
||||||
// when nodeGroup is null, we build the full topology. this is used in some tests.
|
// when nodeGroup is null, we build the full topology. this is used in some tests.
|
||||||
nodeGroup = null;
|
nodeGroup = null;
|
||||||
}
|
}
|
||||||
return build(nodeGroup);
|
return build(jobId, nodeGroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private ProcessorTopology build(Set<String> nodeGroup) {
|
private ProcessorTopology build(String jobId, Set<String> nodeGroup) {
|
||||||
List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
|
List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
|
||||||
Map<String, ProcessorNode> processorMap = new HashMap<>();
|
Map<String, ProcessorNode> processorMap = new HashMap<>();
|
||||||
Map<String, SourceNode> topicSourceMap = new HashMap<>();
|
Map<String, SourceNode> topicSourceMap = new HashMap<>();
|
||||||
|
@ -645,7 +658,7 @@ public class TopologyBuilder {
|
||||||
// create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
|
// create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
|
||||||
for (NodeFactory factory : nodeFactories.values()) {
|
for (NodeFactory factory : nodeFactories.values()) {
|
||||||
if (nodeGroup == null || nodeGroup.contains(factory.name)) {
|
if (nodeGroup == null || nodeGroup.contains(factory.name)) {
|
||||||
ProcessorNode node = factory.build();
|
ProcessorNode node = factory.build(jobId);
|
||||||
processorNodes.add(node);
|
processorNodes.add(node);
|
||||||
processorMap.put(node.name(), node);
|
processorMap.put(node.name(), node);
|
||||||
|
|
||||||
|
@ -660,7 +673,12 @@ public class TopologyBuilder {
|
||||||
}
|
}
|
||||||
} else if (factory instanceof SourceNodeFactory) {
|
} else if (factory instanceof SourceNodeFactory) {
|
||||||
for (String topic : ((SourceNodeFactory) factory).topics) {
|
for (String topic : ((SourceNodeFactory) factory).topics) {
|
||||||
topicSourceMap.put(topic, (SourceNode) node);
|
if (internalTopicNames.contains(topic)) {
|
||||||
|
// prefix the job id to the internal topic name
|
||||||
|
topicSourceMap.put(jobId + "-" + topic, (SourceNode) node);
|
||||||
|
} else {
|
||||||
|
topicSourceMap.put(topic, (SourceNode) node);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if (factory instanceof SinkNodeFactory) {
|
} else if (factory instanceof SinkNodeFactory) {
|
||||||
for (String parent : ((SinkNodeFactory) factory).parents) {
|
for (String parent : ((SinkNodeFactory) factory).parents) {
|
||||||
|
@ -679,7 +697,15 @@ public class TopologyBuilder {
|
||||||
* Get the names of topics that are to be consumed by the source nodes created by this builder.
|
* Get the names of topics that are to be consumed by the source nodes created by this builder.
|
||||||
* @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null
|
* @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null
|
||||||
*/
|
*/
|
||||||
public Set<String> sourceTopics() {
|
public Set<String> sourceTopics(String jobId) {
|
||||||
return Collections.unmodifiableSet(sourceTopicNames);
|
Set<String> topics = new HashSet<>();
|
||||||
|
for (String topic : sourceTopicNames) {
|
||||||
|
if (internalTopicNames.contains(topic)) {
|
||||||
|
topics.add(jobId + "-" + topic);
|
||||||
|
} else {
|
||||||
|
topics.add(topic);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Collections.unmodifiableSet(topics);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -117,7 +117,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
|
||||||
streamThread = (StreamThread) o;
|
streamThread = (StreamThread) o;
|
||||||
streamThread.partitionAssignor(this);
|
streamThread.partitionAssignor(this);
|
||||||
|
|
||||||
this.topicGroups = streamThread.builder.topicGroups();
|
this.topicGroups = streamThread.builder.topicGroups(streamThread.jobId);
|
||||||
|
|
||||||
if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
|
if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
|
||||||
internalTopicManager = new InternalTopicManager(
|
internalTopicManager = new InternalTopicManager(
|
||||||
|
@ -350,7 +350,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
|
||||||
topicToTaskIds.putAll(internalSourceTopicToTaskIds);
|
topicToTaskIds.putAll(internalSourceTopicToTaskIds);
|
||||||
|
|
||||||
for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
|
for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
|
||||||
String topic = streamThread.jobId + "-" + entry.getKey();
|
String topic = entry.getKey();
|
||||||
|
|
||||||
// the expected number of partitions is the max value of TaskId.partition + 1
|
// the expected number of partitions is the max value of TaskId.partition + 1
|
||||||
int numPartitions = 0;
|
int numPartitions = 0;
|
||||||
|
@ -445,7 +445,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
|
||||||
|
|
||||||
/* For Test Only */
|
/* For Test Only */
|
||||||
public Set<TaskId> tasksForState(String stateName) {
|
public Set<TaskId> tasksForState(String stateName) {
|
||||||
return stateChangelogTopicToTaskIds.get(stateName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
|
return stateChangelogTopicToTaskIds.get(ProcessorStateManager.storeChangelogTopic(streamThread.jobId, stateName));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<TaskId> tasksForPartition(TopicPartition partition) {
|
public Set<TaskId> tasksForPartition(TopicPartition partition) {
|
||||||
|
|
|
@ -173,7 +173,7 @@ public class StreamThread extends Thread {
|
||||||
this.jobId = jobId;
|
this.jobId = jobId;
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.builder = builder;
|
this.builder = builder;
|
||||||
this.sourceTopics = builder.sourceTopics();
|
this.sourceTopics = builder.sourceTopics(jobId);
|
||||||
this.clientId = clientId;
|
this.clientId = clientId;
|
||||||
this.processId = processId;
|
this.processId = processId;
|
||||||
this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
|
this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
|
||||||
|
@ -580,7 +580,7 @@ public class StreamThread extends Thread {
|
||||||
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
|
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
|
||||||
sensors.taskCreationSensor.record();
|
sensors.taskCreationSensor.record();
|
||||||
|
|
||||||
ProcessorTopology topology = builder.build(id.topicGroupId);
|
ProcessorTopology topology = builder.build(jobId, id.topicGroupId);
|
||||||
|
|
||||||
return new StreamTask(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
|
return new StreamTask(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
|
||||||
}
|
}
|
||||||
|
@ -650,7 +650,7 @@ public class StreamThread extends Thread {
|
||||||
protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
|
protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
|
||||||
sensors.taskCreationSensor.record();
|
sensors.taskCreationSensor.record();
|
||||||
|
|
||||||
ProcessorTopology topology = builder.build(id.topicGroupId);
|
ProcessorTopology topology = builder.build(jobId, id.topicGroupId);
|
||||||
|
|
||||||
if (!topology.stateStoreSuppliers().isEmpty()) {
|
if (!topology.stateStoreSuppliers().isEmpty()) {
|
||||||
return new StandbyTask(id, jobId, partitions, topology, consumer, restoreConsumer, config, sensors);
|
return new StandbyTask(id, jobId, partitions, topology, consumer, restoreConsumer, config, sensors);
|
||||||
|
|
|
@ -137,6 +137,6 @@ public class KStreamImplTest {
|
||||||
1 + // to
|
1 + // to
|
||||||
2 + // through
|
2 + // through
|
||||||
1, // process
|
1, // process
|
||||||
builder.build(null).processors().size());
|
builder.build("X", null).processors().size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,8 +142,14 @@ public class TopologyBuilderTest {
|
||||||
builder.addSource("source-1", "topic-1");
|
builder.addSource("source-1", "topic-1");
|
||||||
builder.addSource("source-2", "topic-2");
|
builder.addSource("source-2", "topic-2");
|
||||||
builder.addSource("source-3", "topic-3");
|
builder.addSource("source-3", "topic-3");
|
||||||
|
builder.addInternalTopic("topic-3");
|
||||||
|
|
||||||
assertEquals(3, builder.sourceTopics().size());
|
Set<String> expected = new HashSet<String>();
|
||||||
|
expected.add("topic-1");
|
||||||
|
expected.add("topic-2");
|
||||||
|
expected.add("X-topic-3");
|
||||||
|
|
||||||
|
assertEquals(expected, builder.sourceTopics("X"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = TopologyBuilderException.class)
|
@Test(expected = TopologyBuilderException.class)
|
||||||
|
@ -184,13 +190,13 @@ public class TopologyBuilderTest {
|
||||||
|
|
||||||
StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
|
StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
|
||||||
builder.addStateStore(supplier);
|
builder.addStateStore(supplier);
|
||||||
suppliers = builder.build(null).stateStoreSuppliers();
|
suppliers = builder.build("X", null).stateStoreSuppliers();
|
||||||
assertEquals(0, suppliers.size());
|
assertEquals(0, suppliers.size());
|
||||||
|
|
||||||
builder.addSource("source-1", "topic-1");
|
builder.addSource("source-1", "topic-1");
|
||||||
builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
|
builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
|
||||||
builder.connectProcessorAndStateStores("processor-1", "store-1");
|
builder.connectProcessorAndStateStores("processor-1", "store-1");
|
||||||
suppliers = builder.build(null).stateStoreSuppliers();
|
suppliers = builder.build("X", null).stateStoreSuppliers();
|
||||||
assertEquals(1, suppliers.size());
|
assertEquals(1, suppliers.size());
|
||||||
assertEquals(supplier.name(), suppliers.get(0).name());
|
assertEquals(supplier.name(), suppliers.get(0).name());
|
||||||
}
|
}
|
||||||
|
@ -212,7 +218,7 @@ public class TopologyBuilderTest {
|
||||||
|
|
||||||
builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
|
builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
|
||||||
|
|
||||||
Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
|
Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X");
|
||||||
|
|
||||||
Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
|
Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
|
||||||
expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet()));
|
expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet()));
|
||||||
|
@ -250,12 +256,12 @@ public class TopologyBuilderTest {
|
||||||
builder.addStateStore(supplier);
|
builder.addStateStore(supplier);
|
||||||
builder.connectProcessorAndStateStores("processor-5", "store-3");
|
builder.connectProcessorAndStateStores("processor-5", "store-3");
|
||||||
|
|
||||||
Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
|
Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X");
|
||||||
|
|
||||||
Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
|
Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
|
||||||
expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet("store-1" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
|
expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-1"))));
|
||||||
expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), mkSet("store-2" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
|
expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-2"))));
|
||||||
expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String>emptySet(), mkSet("store-3" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)));
|
expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-3"))));
|
||||||
|
|
||||||
assertEquals(3, topicGroups.size());
|
assertEquals(3, topicGroups.size());
|
||||||
assertEquals(expectedTopicGroups, topicGroups);
|
assertEquals(expectedTopicGroups, topicGroups);
|
||||||
|
@ -275,9 +281,9 @@ public class TopologyBuilderTest {
|
||||||
builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
|
builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
|
||||||
builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
|
builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
|
||||||
|
|
||||||
ProcessorTopology topology0 = builder.build(0);
|
ProcessorTopology topology0 = builder.build("X", 0);
|
||||||
ProcessorTopology topology1 = builder.build(1);
|
ProcessorTopology topology1 = builder.build("X", 1);
|
||||||
ProcessorTopology topology2 = builder.build(2);
|
ProcessorTopology topology2 = builder.build("X", 2);
|
||||||
|
|
||||||
assertEquals(mkSet("source-1", "source-2", "processor-1", "processor-2"), nodeNames(topology0.processors()));
|
assertEquals(mkSet("source-1", "source-2", "processor-1", "processor-2"), nodeNames(topology0.processors()));
|
||||||
assertEquals(mkSet("source-3", "source-4", "processor-3"), nodeNames(topology1.processors()));
|
assertEquals(mkSet("source-3", "source-4", "processor-3"), nodeNames(topology1.processors()));
|
||||||
|
|
|
@ -97,7 +97,7 @@ public class ProcessorTopologyTest {
|
||||||
builder.addSink("sink-1", "topic-3", "processor-1");
|
builder.addSink("sink-1", "topic-3", "processor-1");
|
||||||
builder.addSink("sink-2", "topic-4", "processor-1", "processor-2");
|
builder.addSink("sink-2", "topic-4", "processor-1", "processor-2");
|
||||||
|
|
||||||
final ProcessorTopology topology = builder.build(null);
|
final ProcessorTopology topology = builder.build("X", null);
|
||||||
|
|
||||||
assertEquals(6, topology.processors().size());
|
assertEquals(6, topology.processors().size());
|
||||||
|
|
||||||
|
|
|
@ -521,7 +521,7 @@ public class StreamPartitionAssignorTest {
|
||||||
builder.addSink("sink1", "topicX", "processor1");
|
builder.addSink("sink1", "topicX", "processor1");
|
||||||
builder.addSource("source2", "topicX");
|
builder.addSource("source2", "topicX");
|
||||||
builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
|
builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
|
||||||
List<String> topics = Utils.mkList("topic1", "topicX");
|
List<String> topics = Utils.mkList("topic1", "test-topicX");
|
||||||
Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
|
Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
|
||||||
|
|
||||||
UUID uuid1 = UUID.randomUUID();
|
UUID uuid1 = UUID.randomUUID();
|
||||||
|
@ -543,9 +543,7 @@ public class StreamPartitionAssignorTest {
|
||||||
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
|
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
|
||||||
|
|
||||||
// check prepared internal topics
|
// check prepared internal topics
|
||||||
// TODO: we need to change it to 1 after fixing the prefix
|
assertEquals(1, internalTopicManager.readyTopics.size());
|
||||||
assertEquals(2, internalTopicManager.readyTopics.size());
|
|
||||||
assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("topicX"));
|
|
||||||
assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicX"));
|
assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicX"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -171,7 +171,7 @@ public class StreamThreadTest {
|
||||||
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), new SystemTime()) {
|
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), new SystemTime()) {
|
||||||
@Override
|
@Override
|
||||||
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
|
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
|
||||||
ProcessorTopology topology = builder.build(id.topicGroupId);
|
ProcessorTopology topology = builder.build("X", id.topicGroupId);
|
||||||
return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
|
return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -298,7 +298,7 @@ public class StreamThreadTest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
|
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
|
||||||
ProcessorTopology topology = builder.build(id.topicGroupId);
|
ProcessorTopology topology = builder.build("X", id.topicGroupId);
|
||||||
return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
|
return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -420,7 +420,7 @@ public class StreamThreadTest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
|
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
|
||||||
ProcessorTopology topology = builder.build(id.topicGroupId);
|
ProcessorTopology topology = builder.build("X", id.topicGroupId);
|
||||||
return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
|
return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class KStreamTestDriver {
|
||||||
File stateDir,
|
File stateDir,
|
||||||
Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
|
Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
|
||||||
Serializer<?> valSerializer, Deserializer<?> valDeserializer) {
|
Serializer<?> valSerializer, Deserializer<?> valDeserializer) {
|
||||||
this.topology = builder.build(null);
|
this.topology = builder.build("X", null);
|
||||||
this.stateDir = stateDir;
|
this.stateDir = stateDir;
|
||||||
this.context = new MockProcessorContext(this, stateDir, keySerializer, keyDeserializer, valSerializer, valDeserializer, new MockRecordCollector());
|
this.context = new MockProcessorContext(this, stateDir, keySerializer, keyDeserializer, valSerializer, valDeserializer, new MockRecordCollector());
|
||||||
|
|
||||||
|
@ -127,7 +127,7 @@ public class KStreamTestDriver {
|
||||||
public MockRecordCollector() {
|
public MockRecordCollector() {
|
||||||
super(null);
|
super(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
|
public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
|
||||||
StreamPartitioner<K, V> partitioner) {
|
StreamPartitioner<K, V> partitioner) {
|
||||||
|
|
|
@ -146,7 +146,7 @@ public class ProcessorTopologyTestDriver {
|
||||||
*/
|
*/
|
||||||
public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String... storeNames) {
|
public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String... storeNames) {
|
||||||
id = new TaskId(0, 0);
|
id = new TaskId(0, 0);
|
||||||
topology = builder.build(null);
|
topology = builder.build("X", null);
|
||||||
|
|
||||||
// Set up the consumer and producer ...
|
// Set up the consumer and producer ...
|
||||||
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
|
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
|
||||||
|
|
|
@ -41,7 +41,6 @@ class StreamsBounceTest(KafkaTest):
|
||||||
self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
|
self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
|
||||||
self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
|
self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
|
||||||
|
|
||||||
@ignore
|
|
||||||
def test_bounce(self):
|
def test_bounce(self):
|
||||||
"""
|
"""
|
||||||
Start a smoke test client, then abort (kill -9) and restart it a few times.
|
Start a smoke test client, then abort (kill -9) and restart it a few times.
|
||||||
|
|
|
@ -44,7 +44,6 @@ class StreamsSmokeTest(KafkaTest):
|
||||||
self.processor3 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
|
self.processor3 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
|
||||||
self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
|
self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
|
||||||
|
|
||||||
@ignore
|
|
||||||
def test_streams(self):
|
def test_streams(self):
|
||||||
"""
|
"""
|
||||||
Start a few smoke test clients, then repeat start a new one, stop (cleanly) running one a few times.
|
Start a few smoke test clients, then repeat start a new one, stop (cleanly) running one a few times.
|
||||||
|
|
Loading…
Reference in New Issue