diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index c90024f278e..c2835a3cd8c 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -84,7 +84,13 @@
and can be obtained by calling Topology#describe().
An example using this new API is shown in the quickstart section.
-
+
+
+ With the introduction of KIP-202
+ a new method merge() has been created in KStream as the StreamsBuilder class's StreamsBuilder#merge() has been removed.
+ The method signature was also changed, too: instead of providing multiple KStreams into the method at the once, only a single KStream is accepted.
+
+
New methods in KafkaStreams:
@@ -214,7 +220,9 @@
If exactly-once processing is enabled via the processing.guarantees parameter, internally Streams switches from a producer per thread to a producer per task runtime model.
In order to distinguish the different producers, the producer's client.id additionally encodes the task-ID for this case.
Because the producer's client.id is used to report JMX metrics, it might be required to update tools that receive those metrics.
+
+
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 7e746e6a2ba..94d19aefd76 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -59,7 +59,7 @@ public class StreamsBuilder {
final InternalTopologyBuilder internalTopologyBuilder = topology.internalTopologyBuilder;
private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
-
+
/**
* Create a {@link KStream} from the specified topics.
* The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
@@ -492,18 +492,6 @@ public class StreamsBuilder {
return this;
}
- /**
- * Create a new instance of {@link KStream} by merging the given {@link KStream}s.
- *
- * There is no ordering guarantee for records from different {@link KStream}s.
- *
- * @param streams the {@link KStream}s to be merged
- * @return a {@link KStream} containing all records of the given streams
- */
- public synchronized KStream merge(final KStream... streams) {
- return internalStreamsBuilder.merge(streams);
- }
-
/**
* Returns the {@link Topology} that represents the specified processing logic.
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index f8f99f26e4b..c56a4edced3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -446,6 +446,19 @@ public interface KStream {
* @param printed options for printing
*/
void print(final Printed printed);
+
+ /**
+ * Merge this stream and the given stream into one larger stream.
+ *
+ * There is no ordering guarantee between records from this {@code KStream} and records from
+ * the provided {@code KStream} in the merged stream.
+ * Relative order is preserved within each input stream though (ie, records within one input
+ * stream are processed in order).
+ *
+ * @param a stream which is to be merged into this stream
+ * @return a merged stream containing all records from this and the provided {@code KStream}
+ */
+ KStream merge(final KStream stream);
/**
* Write the records of this stream to a file at the given path.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index d1dd6ac8a7b..e7bcc95cacc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -1225,8 +1225,16 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* @return a {@link KStream} containing all records of the given streams
*/
public KStream merge(final KStream... streams) {
+ Objects.requireNonNull(streams, "streams can't be null");
+ if (streams.length <= 1) {
+ throw new IllegalArgumentException("Number of arguments required needs to be greater than one.");
+ }
try {
- return KStreamImpl.merge(internalStreamsBuilder, streams);
+ KStream mergedStream = streams[0];
+ for (int i = 1; i < streams.length; i++) {
+ mergedStream = mergedStream.merge(streams[i]);
+ }
+ return mergedStream;
} catch (final org.apache.kafka.streams.errors.TopologyException e) {
throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 3963657f410..fa696fefdaf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -159,11 +159,6 @@ public class InternalStreamsBuilder {
return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier(storeBuilder.name()));
}
-
- public KStream merge(final KStream... streams) {
- return KStreamImpl.merge(this, streams);
- }
-
String newName(final String prefix) {
return prefix + String.format("%010d", index.getAndIncrement());
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index cbaf95acad7..ae3808e825a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -183,7 +183,7 @@ public class KStreamImpl extends AbstractStream implements KStream(builder, name, sourceNodes, this.repartitionRequired);
}
-
+
@Override
public void print() {
print(defaultKeyValueMapper, null, null, this.name);
@@ -346,24 +346,22 @@ public class KStreamImpl extends AbstractStream implements KStream KStream merge(final InternalStreamsBuilder builder,
- final KStream[] streams) {
- if (streams == null || streams.length == 0) {
- throw new IllegalArgumentException("Parameter must not be null or has length zero");
- }
-
+ @Override
+ public KStream merge(final KStream stream) {
+ Objects.requireNonNull(stream);
+ return merge(builder, stream);
+ }
+
+ private KStream merge(final InternalStreamsBuilder builder,
+ final KStream stream) {
+ KStreamImpl streamImpl = (KStreamImpl) stream;
String name = builder.newName(MERGE_NAME);
- String[] parentNames = new String[streams.length];
+ String[] parentNames = {this.name, streamImpl.name};
Set allSourceNodes = new HashSet<>();
- boolean requireRepartitioning = false;
- for (int i = 0; i < streams.length; i++) {
- KStreamImpl stream = (KStreamImpl) streams[i];
-
- parentNames[i] = stream.name;
- requireRepartitioning |= stream.repartitionRequired;
- allSourceNodes.addAll(stream.sourceNodes);
- }
+ boolean requireRepartitioning = streamImpl.repartitionRequired || repartitionRequired;
+ allSourceNodes.addAll(sourceNodes);
+ allSourceNodes.addAll(streamImpl.sourceNodes);
builder.internalTopologyBuilder.addProcessor(name, new KStreamPassThrough<>(), parentNames);
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 4ce202b94ab..33ede933d77 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -94,7 +94,7 @@ public class StreamsBuilderTest {
assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed);
assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed);
}
-
+
@Test
public void testMerge() {
final String topic1 = "topic-1";
@@ -102,7 +102,7 @@ public class StreamsBuilderTest {
final KStream source1 = builder.stream(topic1);
final KStream source2 = builder.stream(topic2);
- final KStream merged = builder.merge(source1, source2);
+ final KStream merged = source1.merge(source2);
final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>();
merged.process(processorSupplier);
@@ -160,7 +160,7 @@ public class StreamsBuilderTest {
assertThat(store.get(1L), equalTo("value1"));
assertThat(store.get(2L), equalTo("value2"));
}
-
+
@Test(expected = TopologyException.class)
public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
builder.stream(Collections.emptyList());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 6a6d2a49682..c0bfa99a2e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -185,7 +185,7 @@ public class KStreamBuilderTest {
}
});
- final KStream merged = builder.merge(processedSource1, processedSource2, source3);
+ final KStream merged = processedSource1.merge(processedSource2).merge(source3);
merged.groupByKey().count("my-table");
final Map> actual = builder.stateStoreNameToSourceTopics();
assertEquals(Utils.mkList("topic-1", "topic-2", "topic-3"), actual.get("my-table"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 494e197f0cd..68d0e244db9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -130,7 +130,7 @@ public class InternalStreamsBuilderTest {
}
});
- final KStream merged = builder.merge(processedSource1, processedSource2, source3);
+ final KStream merged = processedSource1.merge(processedSource2).merge(source3);
merged.groupByKey().count("my-table");
final Map> actual = builder.internalTopologyBuilder.stateStoreNameToSourceTopics();
assertEquals(Utils.mkList("topic-1", "topic-2", "topic-3"), actual.get("my-table"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index be1d8651b06..0a0232c16e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@@ -468,5 +469,59 @@ public class KStreamImplTest {
public void shouldThrowNullPointerOnOuterJoinJoinedIsNull() {
testStream.outerJoin(testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(10), null);
}
+
+ @Test
+ public void shouldMergeTwoStreams() {
+ final String topic1 = "topic-1";
+ final String topic2 = "topic-2";
+ final KStream source1 = builder.stream(topic1);
+ final KStream source2 = builder.stream(topic2);
+ final KStream merged = source1.merge(source2);
+
+ final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>();
+ merged.process(processorSupplier);
+
+ driver.setUp(builder);
+ driver.setTime(0L);
+
+ driver.process(topic1, "A", "aa");
+ driver.process(topic2, "B", "bb");
+ driver.process(topic2, "C", "cc");
+ driver.process(topic1, "D", "dd");
+
+ assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
+ }
+
+ @Test
+ public void shouldMergeMultipleStreams() {
+ final String topic1 = "topic-1";
+ final String topic2 = "topic-2";
+ final String topic3 = "topic-3";
+ final String topic4 = "topic-4";
+
+ final KStream source1 = builder.stream(topic1);
+ final KStream source2 = builder.stream(topic2);
+ final KStream source3 = builder.stream(topic3);
+ final KStream source4 = builder.stream(topic4);
+ final KStream merged = source1.merge(source2).merge(source3).merge(source4);
+
+ final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>();
+ merged.process(processorSupplier);
+
+ driver.setUp(builder);
+ driver.setTime(0L);
+
+ driver.process(topic1, "A", "aa");
+ driver.process(topic2, "B", "bb");
+ driver.process(topic3, "C", "cc");
+ driver.process(topic4, "D", "dd");
+ driver.process(topic4, "E", "ee");
+ driver.process(topic3, "F", "ff");
+ driver.process(topic2, "G", "gg");
+ driver.process(topic1, "H", "hh");
+
+ assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"),
+ processorSupplier.processed);
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index f3dbb32d14a..8e5d90dc9ac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -83,7 +83,7 @@ public class StreamsMetadataStateTest {
.groupByKey()
.count("table-three");
- builder.merge(one, two).groupByKey().count("merged-table");
+ one.merge(two).groupByKey().count("merged-table");
builder.stream("topic-four").mapValues(new ValueMapper