diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index c8a8bd3be32..a95d08c0e86 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -68,6 +68,16 @@ public class KStreamBuilder extends TopologyBuilder { return new KStreamImpl<>(this, name, Collections.singleton(name)); } + /** + * Creates a new stream by merging the given streams + * + * @param streams the streams to be merged + * @return KStream + */ + public KStream merge(KStream... streams) { + return KStreamImpl.merge(this, streams); + } + public String newName(String prefix) { return prefix + String.format("%010d", index.getAndIncrement()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 09864056f7f..1ac23b2c84e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; import java.lang.reflect.Array; import java.util.Collections; +import java.util.HashSet; import java.util.Set; public class KStreamImpl implements KStream { @@ -65,12 +66,12 @@ public class KStreamImpl implements KStream { public static final String JOINOTHER_NAME = "KAFKA-JOINOTHER-"; - public static final String JOINMERGE_NAME = "KAFKA-JOINMERGE-"; + public static final String MERGE_NAME = "KAFKA-MERGE-"; public static final String SOURCE_NAME = "KAFKA-SOURCE-"; protected final KStreamBuilder topology; - protected final String name; + public final String name; protected final Set sourceNodes; public KStreamImpl(KStreamBuilder topology, String name, Set sourceNodes) { @@ -161,6 +162,30 @@ public class KStreamImpl implements KStream { return branchChildren; } + public static KStream merge(KStreamBuilder topology, KStream[] streams) { + String name = topology.newName(MERGE_NAME); + String[] parentNames = new String[streams.length]; + Set allSourceNodes = new HashSet<>(); + + for (int i = 0; i < streams.length; i++) { + KStreamImpl stream = (KStreamImpl) streams[i]; + + parentNames[i] = stream.name; + + if (allSourceNodes != null) { + if (stream.sourceNodes != null) + allSourceNodes.addAll(stream.sourceNodes); + else + allSourceNodes = null; + } + + } + + topology.addProcessor(name, new KStreamPassThrough<>(), parentNames); + + return new KStreamImpl<>(topology, name, allSourceNodes); + } + @Override public KStream through(String topic, Serializer keySerializer, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java index cb49873a8ff..100fbee26cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java @@ -55,7 +55,7 @@ public final class KStreamWindowedImpl extends KStreamImpl implement String joinThisName = topology.newName(JOINTHIS_NAME); String joinOtherName = topology.newName(JOINOTHER_NAME); - String joinMergeName = topology.newName(JOINMERGE_NAME); + String joinMergeName = topology.newName(MERGE_NAME); topology.addProcessor(joinThisName, joinThis, this.name); topology.addProcessor(joinOtherName, joinOther, ((KStreamImpl) other).name); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java index cf1cfaa0f98..d6994a9b997 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -17,8 +17,11 @@ package org.apache.kafka.streams.kstream; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.internals.KStreamImpl; import org.apache.kafka.streams.processor.TopologyException; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -48,4 +51,30 @@ public class KStreamBuilderTest { assertEquals("Y-0000000001", builder.newName("Y-")); assertEquals("Z-0000000002", builder.newName("Z-")); } + + @Test + public void testMerge() { + String topic1 = "topic-1"; + String topic2 = "topic-2"; + + KStreamBuilder builder = new KStreamBuilder(); + + KStream source1 = builder.from(topic1); + KStream source2 = builder.from(topic2); + KStream merged = builder.merge(source1, source2); + + MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>(); + merged.process(processorSupplier); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + driver.setTime(0L); + + driver.process(topic1, "A", "aa"); + driver.process(topic2, "B", "bb"); + driver.process(topic2, "C", "cc"); + driver.process(topic1, "D", "dd"); + + assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed); + } + }