KAFKA-5765; Move merge() from StreamsBuilder to KStream

This is the polished version.
1. The old merge() method in StreamsBuilder has been removed,
2. The merge() method in KStreamBuilder was changed so that it would use the single variable argument
rather than several variable arguments in the KStreamImpl implementation
3. The merge() method in KStream has been declared as final and tests have been added to test correctness.

Author: Richard Yu <richardyu@Richards-Air.attlocal.net>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, Damian Guy <damian.guy@gmail.com>

Closes #3916 from ConcurrencyPractitioner/trunk
This commit is contained in:
Richard Yu 2017-09-26 09:42:53 +01:00 committed by Damian Guy
parent 4e43a7231d
commit b8be86b805
11 changed files with 107 additions and 42 deletions

View File

@ -85,6 +85,12 @@
An example using this new API is shown in the <a href="/{{version}}/documentation/streams/quickstart">quickstart section</a>.
</p>
<p>
With the introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-202+Move+merge%28%29+from+StreamsBuilder+to+KStream">KIP-202</a>
a new method <code>merge()</code> has been created in <code>KStream</code> as the StreamsBuilder class's <code>StreamsBuilder#merge()</code> has been removed.
The method signature was also changed, too: instead of providing multiple <code>KStream</code>s into the method at the once, only a single <code>KStream</code> is accepted.
</p>
<p>
New methods in <code>KafkaStreams</code>:
</p>
@ -214,7 +220,9 @@
If exactly-once processing is enabled via the <code>processing.guarantees</code> parameter, internally Streams switches from a producer per thread to a producer per task runtime model.
In order to distinguish the different producers, the producer's <code>client.id</code> additionally encodes the task-ID for this case.
Because the producer's <code>client.id</code> is used to report JMX metrics, it might be required to update tools that receive those metrics.
</p>
<p> Producer's <code>client.id</code> naming schema: </p>
<ul>
<li> at-least-once (default): <code>[client.Id]-StreamThread-[sequence-number]</code> </li>

View File

@ -492,18 +492,6 @@ public class StreamsBuilder {
return this;
}
/**
* Create a new instance of {@link KStream} by merging the given {@link KStream}s.
* <p>
* There is no ordering guarantee for records from different {@link KStream}s.
*
* @param streams the {@link KStream}s to be merged
* @return a {@link KStream} containing all records of the given streams
*/
public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
return internalStreamsBuilder.merge(streams);
}
/**
* Returns the {@link Topology} that represents the specified processing logic.
*

View File

@ -447,6 +447,19 @@ public interface KStream<K, V> {
*/
void print(final Printed<K, V> printed);
/**
* Merge this stream and the given stream into one larger stream.
* <p>
* There is no ordering guarantee between records from this {@code KStream} and records from
* the provided {@code KStream} in the merged stream.
* Relative order is preserved within each input stream though (ie, records within one input
* stream are processed in order).
*
* @param a stream which is to be merged into this stream
* @return a merged stream containing all records from this and the provided {@code KStream}
*/
KStream<K, V> merge(final KStream<K, V> stream);
/**
* Write the records of this stream to a file at the given path.
* This function will use the generated name of the parent processor node to label the key/value pairs printed to

View File

@ -1225,8 +1225,16 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* @return a {@link KStream} containing all records of the given streams
*/
public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
Objects.requireNonNull(streams, "streams can't be null");
if (streams.length <= 1) {
throw new IllegalArgumentException("Number of arguments required needs to be greater than one.");
}
try {
return KStreamImpl.merge(internalStreamsBuilder, streams);
KStream<K, V> mergedStream = streams[0];
for (int i = 1; i < streams.length; i++) {
mergedStream = mergedStream.merge(streams[i]);
}
return mergedStream;
} catch (final org.apache.kafka.streams.errors.TopologyException e) {
throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
}

View File

@ -159,11 +159,6 @@ public class InternalStreamsBuilder {
return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()));
}
public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
return KStreamImpl.merge(this, streams);
}
String newName(final String prefix) {
return prefix + String.format("%010d", index.getAndIncrement());
}

View File

@ -346,24 +346,22 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
return branchChildren;
}
public static <K, V> KStream<K, V> merge(final InternalStreamsBuilder builder,
final KStream<K, V>[] streams) {
if (streams == null || streams.length == 0) {
throw new IllegalArgumentException("Parameter <streams> must not be null or has length zero");
@Override
public KStream<K, V> merge(final KStream<K, V> stream) {
Objects.requireNonNull(stream);
return merge(builder, stream);
}
private KStream<K, V> merge(final InternalStreamsBuilder builder,
final KStream<K, V> stream) {
KStreamImpl<K, V> streamImpl = (KStreamImpl<K, V>) stream;
String name = builder.newName(MERGE_NAME);
String[] parentNames = new String[streams.length];
String[] parentNames = {this.name, streamImpl.name};
Set<String> allSourceNodes = new HashSet<>();
boolean requireRepartitioning = false;
for (int i = 0; i < streams.length; i++) {
KStreamImpl<K, V> stream = (KStreamImpl<K, V>) streams[i];
parentNames[i] = stream.name;
requireRepartitioning |= stream.repartitionRequired;
allSourceNodes.addAll(stream.sourceNodes);
}
boolean requireRepartitioning = streamImpl.repartitionRequired || repartitionRequired;
allSourceNodes.addAll(sourceNodes);
allSourceNodes.addAll(streamImpl.sourceNodes);
builder.internalTopologyBuilder.addProcessor(name, new KStreamPassThrough<>(), parentNames);

View File

@ -102,7 +102,7 @@ public class StreamsBuilderTest {
final KStream<String, String> source1 = builder.stream(topic1);
final KStream<String, String> source2 = builder.stream(topic2);
final KStream<String, String> merged = builder.merge(source1, source2);
final KStream<String, String> merged = source1.merge(source2);
final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
merged.process(processorSupplier);

View File

@ -185,7 +185,7 @@ public class KStreamBuilderTest {
}
});
final KStream<String, String> merged = builder.merge(processedSource1, processedSource2, source3);
final KStream<String, String> merged = processedSource1.merge(processedSource2).merge(source3);
merged.groupByKey().count("my-table");
final Map<String, List<String>> actual = builder.stateStoreNameToSourceTopics();
assertEquals(Utils.mkList("topic-1", "topic-2", "topic-3"), actual.get("my-table"));

View File

@ -130,7 +130,7 @@ public class InternalStreamsBuilderTest {
}
});
final KStream<String, String> merged = builder.merge(processedSource1, processedSource2, source3);
final KStream<String, String> merged = processedSource1.merge(processedSource2).merge(source3);
merged.groupByKey().count("my-table");
final Map<String, List<String>> actual = builder.internalTopologyBuilder.stateStoreNameToSourceTopics();
assertEquals(Utils.mkList("topic-1", "topic-2", "topic-3"), actual.get("my-table"));

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -469,4 +470,58 @@ public class KStreamImplTest {
testStream.outerJoin(testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(10), null);
}
@Test
public void shouldMergeTwoStreams() {
final String topic1 = "topic-1";
final String topic2 = "topic-2";
final KStream<String, String> source1 = builder.stream(topic1);
final KStream<String, String> source2 = builder.stream(topic2);
final KStream<String, String> merged = source1.merge(source2);
final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
merged.process(processorSupplier);
driver.setUp(builder);
driver.setTime(0L);
driver.process(topic1, "A", "aa");
driver.process(topic2, "B", "bb");
driver.process(topic2, "C", "cc");
driver.process(topic1, "D", "dd");
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
}
@Test
public void shouldMergeMultipleStreams() {
final String topic1 = "topic-1";
final String topic2 = "topic-2";
final String topic3 = "topic-3";
final String topic4 = "topic-4";
final KStream<String, String> source1 = builder.stream(topic1);
final KStream<String, String> source2 = builder.stream(topic2);
final KStream<String, String> source3 = builder.stream(topic3);
final KStream<String, String> source4 = builder.stream(topic4);
final KStream<String, String> merged = source1.merge(source2).merge(source3).merge(source4);
final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
merged.process(processorSupplier);
driver.setUp(builder);
driver.setTime(0L);
driver.process(topic1, "A", "aa");
driver.process(topic2, "B", "bb");
driver.process(topic3, "C", "cc");
driver.process(topic4, "D", "dd");
driver.process(topic4, "E", "ee");
driver.process(topic3, "F", "ff");
driver.process(topic2, "G", "gg");
driver.process(topic1, "H", "hh");
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"),
processorSupplier.processed);
}
}

View File

@ -83,7 +83,7 @@ public class StreamsMetadataStateTest {
.groupByKey()
.count("table-three");
builder.merge(one, two).groupByKey().count("merged-table");
one.merge(two).groupByKey().count("merged-table");
builder.stream("topic-four").mapValues(new ValueMapper<Object, Object>() {
@Override