MINOR: add KStream merge operator

guozhangwang

Added KStreamBuilder.merge(KStream...).

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #536 from ymatsuda/kstream_merge_operator
This commit is contained in:
Yasuhiro Matsuda 2015-11-17 17:34:54 -08:00 committed by Confluent
parent ffc0965d38
commit 1a36af80b7
4 changed files with 67 additions and 3 deletions

View File

@ -68,6 +68,16 @@ public class KStreamBuilder extends TopologyBuilder {
return new KStreamImpl<>(this, name, Collections.singleton(name));
}
/**
* Creates a new stream by merging the given streams
*
* @param streams the streams to be merged
* @return KStream
*/
public <K, V> KStream<K, V> merge(KStream<K, V>... streams) {
return KStreamImpl.merge(this, streams);
}
public String newName(String prefix) {
return prefix + String.format("%010d", index.getAndIncrement());
}

View File

@ -33,6 +33,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.lang.reflect.Array;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
public class KStreamImpl<K, V> implements KStream<K, V> {
@ -65,12 +66,12 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
public static final String JOINOTHER_NAME = "KAFKA-JOINOTHER-";
public static final String JOINMERGE_NAME = "KAFKA-JOINMERGE-";
public static final String MERGE_NAME = "KAFKA-MERGE-";
public static final String SOURCE_NAME = "KAFKA-SOURCE-";
protected final KStreamBuilder topology;
protected final String name;
public final String name;
protected final Set<String> sourceNodes;
public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
@ -161,6 +162,30 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
return branchChildren;
}
public static <K, V> KStream<K, V> merge(KStreamBuilder topology, KStream<K, V>[] streams) {
String name = topology.newName(MERGE_NAME);
String[] parentNames = new String[streams.length];
Set<String> allSourceNodes = new HashSet<>();
for (int i = 0; i < streams.length; i++) {
KStreamImpl stream = (KStreamImpl) streams[i];
parentNames[i] = stream.name;
if (allSourceNodes != null) {
if (stream.sourceNodes != null)
allSourceNodes.addAll(stream.sourceNodes);
else
allSourceNodes = null;
}
}
topology.addProcessor(name, new KStreamPassThrough<>(), parentNames);
return new KStreamImpl<>(topology, name, allSourceNodes);
}
@Override
public <K1, V1> KStream<K1, V1> through(String topic,
Serializer<K> keySerializer,

View File

@ -55,7 +55,7 @@ public final class KStreamWindowedImpl<K, V> extends KStreamImpl<K, V> implement
String joinThisName = topology.newName(JOINTHIS_NAME);
String joinOtherName = topology.newName(JOINOTHER_NAME);
String joinMergeName = topology.newName(JOINMERGE_NAME);
String joinMergeName = topology.newName(MERGE_NAME);
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KStreamImpl) other).name);

View File

@ -17,8 +17,11 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.processor.TopologyException;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@ -48,4 +51,30 @@ public class KStreamBuilderTest {
assertEquals("Y-0000000001", builder.newName("Y-"));
assertEquals("Z-0000000002", builder.newName("Z-"));
}
@Test
public void testMerge() {
String topic1 = "topic-1";
String topic2 = "topic-2";
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source1 = builder.from(topic1);
KStream<String, String> source2 = builder.from(topic2);
KStream<String, String> merged = builder.merge(source1, source2);
MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
merged.process(processorSupplier);
KStreamTestDriver driver = new KStreamTestDriver(builder);
driver.setTime(0L);
driver.process(topic1, "A", "aa");
driver.process(topic2, "B", "bb");
driver.process(topic2, "C", "cc");
driver.process(topic1, "D", "dd");
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
}
}