MINOR: update the branch(split) doc and java doc and tests (#11195)

Reviewers: Ivan Ponomarev <iponomarev@mail.ru>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Luke Chen 2021-08-11 04:37:59 +08:00 committed by GitHub
parent 35ebdd7ed0
commit 2bfd0ae2e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 41 additions and 29 deletions

View File

@ -360,14 +360,14 @@ GlobalKTable&lt;String, Long&gt; wordCounts = builder.globalTable(
<tbody valign="top">
<tr class="row-even"><td><p class="first"><strong>Branch</strong></p>
<ul class="last simple">
<li>KStream &rarr; KStream[]</li>
<li>KStream &rarr; BranchedKStream</li>
</ul>
</td>
<td><p class="first">Branch (or split) a <code class="docutils literal"><span class="pre">KStream</span></code> based on the supplied predicates into one or more <code class="docutils literal"><span class="pre">KStream</span></code> instances.
(<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#split">details</a>)</p>
(<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#split()">details</a>)</p>
<p>Predicates are evaluated in order. A record is placed to one and only one output stream on the first match:
if the n-th predicate evaluates to true, the record is placed to n-th stream. If no predicate matches, the
the record is dropped.</p>
if the n-th predicate evaluates to true, the record is placed to n-th stream. If a record does not match any predicates,
it will be routed to the default branch, or dropped if no default branch is created.</p>
<p>Branching is useful, for example, to route records to different downstream topics.</p>
<pre class="line-numbers"><code class="language-java">KStream&lt;String, Long&gt; stream = ...;
Map&lt;String, KStream&lt;String, Long&gt;&gt; branches =
@ -376,7 +376,7 @@ Map&lt;String, KStream&lt;String, Long&gt;&gt; branches =
Branched.as(&quot;A&quot;))
.branch((key, value) -&gt; key.startsWith(&quot;B&quot;), /* second predicate */
Branched.as(&quot;B&quot;))
.defaultBranch(Branched.as(&quot;C&quot;))
.defaultBranch(Branched.as(&quot;C&quot;)) /* default branch */
);
// KStream branches.get(&quot;Branch-A&quot;) contains all records whose keys start with &quot;A&quot;

View File

@ -143,6 +143,7 @@ public class Branched<K, V> implements NamedOperation<Branched<K, V>> {
*/
@Override
public Branched<K, V> withName(final String name) {
Objects.requireNonNull(name, "name cannot be null");
return new Branched<>(name, chainFunction, chainConsumer);
}
}

View File

@ -22,7 +22,7 @@ import java.util.Map;
* Branches the records in the original stream based on the predicates supplied for the branch definitions.
* <p>
* Branches are defined with {@link BranchedKStream#branch(Predicate, Branched)} or
* {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is evaluated against the predicates
* {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is evaluated against the {@code predicate}
* supplied via {@link Branched} parameters, and is routed to the first branch for which its respective predicate
* evaluates to {@code true}. If a record does not match any predicates, it will be routed to the default branch,
* or dropped if no default branch is created.
@ -50,7 +50,7 @@ import java.util.Map;
* {@link Branched} parameter, its value is appended to the prefix to form the {@code Map} key
* <li>If a name is not provided for the branch, then the key defaults to {@code prefix + position} of the branch
* as a decimal number, starting from {@code "1"}
* <li>If a name is not provided for the {@link BranchedKStream#defaultBranch()} call, then the key defaults
* <li>If a name is not provided for the {@link BranchedKStream#defaultBranch()}, then the key defaults
* to {@code prefix + "0"}
* </ul>
* The values of the respective {@code Map<Stream, KStream<K, V>>} entries are formed as following:
@ -69,7 +69,8 @@ import java.util.Map;
* .branch(predicate1, Branched.as("bar")) // "foo-bar"
* .branch(predicate2, Branched.withConsumer(ks->ks.to("A")) // no entry: a Consumer is provided
* .branch(predicate3, Branched.withFunction(ks->null)) // no entry: chain function returns null
* .branch(predicate4) // "foo-4": name defaults to the branch position
* .branch(predicate4, Branched.withFunction(ks->ks)) // "foo-4": chain function returns non-null value
* .branch(predicate5) // "foo-5": name defaults to the branch position
* .defaultBranch() // "foo-0": "0" is the default name for the default branch
* }</pre>
*

View File

@ -782,18 +782,22 @@ public interface KStream<K, V> {
KStream<K, V>[] branch(final Named named, final Predicate<? super K, ? super V>... predicates);
/**
* Split this stream. {@link BranchedKStream} can be used for routing the records to different branches depending
* on evaluation against the supplied predicates.
* Stream branching is a stateless record-by-record operation.
* Split this stream into different branches. The returned {@link BranchedKStream} instance can be used for routing
* the records to different branches depending on evaluation against the supplied predicates.
* <p>
* Note: Stream branching is a stateless record-by-record operation.
* Please check {@link BranchedKStream} for detailed description and usage example
*
* @return {@link BranchedKStream} that provides methods for routing the records to different branches.
*/
BranchedKStream<K, V> split();
/**
* Split this stream. {@link BranchedKStream} can be used for routing the records to different branches depending
* on evaluation against the supplied predicates.
* Stream branching is a stateless record-by-record operation.
* Split this stream into different branches. The returned {@link BranchedKStream} instance can be used for routing
* the records to different branches depending on evaluation against the supplied predicates.
* <p>
* Note: Stream branching is a stateless record-by-record operation.
* Please check {@link BranchedKStream} for detailed description and usage example
*
* @param named a {@link Named} config used to name the processor in the topology and also to set the name prefix
* for the resulting branches (see {@link BranchedKStream})

View File

@ -50,6 +50,7 @@ public class KStreamSplitTest {
private final Predicate<Integer, String> isMultipleOfThree = (key, value) -> (key % 3) == 0;
private final Predicate<Integer, String> isMultipleOfFive = (key, value) -> (key % 5) == 0;
private final Predicate<Integer, String> isMultipleOfSeven = (key, value) -> (key % 7) == 0;
private final Predicate<Integer, String> isNegative = (key, value) -> key < 0;
private final KStream<Integer, String> source = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
@Test
@ -68,14 +69,14 @@ public class KStreamSplitTest {
final TestOutputTopic<Integer, String> x2 = driver.createOutputTopic("x2", new IntegerDeserializer(), new StringDeserializer());
final TestOutputTopic<Integer, String> x3 = driver.createOutputTopic("x3", new IntegerDeserializer(), new StringDeserializer());
final TestOutputTopic<Integer, String> x5 = driver.createOutputTopic("x5", new IntegerDeserializer(), new StringDeserializer());
assertEquals(Arrays.asList("V2", "V4", "V6"), x2.readValuesToList());
assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), x2.readValuesToList());
assertEquals(Arrays.asList("V3"), x3.readValuesToList());
assertEquals(Arrays.asList("V5"), x5.readValuesToList());
});
}
private void withDriver(final Consumer<TopologyTestDriver> test) {
final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
final int[] expectedKeys = new int[]{-1, 0, 1, 2, 3, 4, 5, 6, 7};
final Topology topology = builder.build();
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
final TestInputTopic<Integer, String> inputTopic = driver.createInputTopic(topicName, new IntegerSerializer(), new StringSerializer());
@ -104,25 +105,29 @@ public class KStreamSplitTest {
// "foo-bar"
.branch(isEven, Branched.as("bar"))
// no entry: a Consumer is provided
.branch(isMultipleOfThree, Branched.withConsumer(ks -> {
}))
.branch(isMultipleOfThree, Branched.withConsumer(ks -> { }))
// no entry: chain function returns null
.branch(isMultipleOfFive, Branched.withFunction(ks -> null))
// "foo-4": name defaults to the branch position
// "foo-4": chain function returns non-null value
.branch(isNegative, Branched.withFunction(ks -> ks))
// "foo-5": name defaults to the branch position
.branch(isMultipleOfSeven)
// "foo-0": "0" is the default name for the default branch
.defaultBranch();
assertEquals(3, branches.size());
branches.get("foo-bar").to("foo-bar");
branches.get("foo-4").to("foo-4");
branches.get("foo-0").to("foo-0");
assertEquals(4, branches.size());
// direct the branched streams into different topics named with branch name
for (final Map.Entry<String, KStream<Integer, String>> branch: branches.entrySet()) {
branch.getValue().to(branch.getKey());
}
builder.build();
withDriver(driver -> {
final TestOutputTopic<Integer, String> even = driver.createOutputTopic("foo-bar", new IntegerDeserializer(), new StringDeserializer());
final TestOutputTopic<Integer, String> x7 = driver.createOutputTopic("foo-4", new IntegerDeserializer(), new StringDeserializer());
final TestOutputTopic<Integer, String> negative = driver.createOutputTopic("foo-4", new IntegerDeserializer(), new StringDeserializer());
final TestOutputTopic<Integer, String> x7 = driver.createOutputTopic("foo-5", new IntegerDeserializer(), new StringDeserializer());
final TestOutputTopic<Integer, String> defaultBranch = driver.createOutputTopic("foo-0", new IntegerDeserializer(), new StringDeserializer());
assertEquals(Arrays.asList("V2", "V4", "V6"), even.readValuesToList());
assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), even.readValuesToList());
assertEquals(Arrays.asList("V-1"), negative.readValuesToList());
assertEquals(Arrays.asList("V7"), x7.readValuesToList());
assertEquals(Arrays.asList("V1"), defaultBranch.readValuesToList());
});
@ -130,14 +135,15 @@ public class KStreamSplitTest {
@Test
public void testBranchingWithNoTerminalOperation() {
final String outputTopicName = "output";
source.split()
.branch(isEven, Branched.withConsumer(ks -> ks.to("output")))
.branch(isMultipleOfFive, Branched.withConsumer(ks -> ks.to("output")));
.branch(isEven, Branched.withConsumer(ks -> ks.to(outputTopicName)))
.branch(isMultipleOfFive, Branched.withConsumer(ks -> ks.to(outputTopicName)));
builder.build();
withDriver(driver -> {
final TestOutputTopic<Integer, String> outputTopic =
driver.createOutputTopic("output", new IntegerDeserializer(), new StringDeserializer());
assertEquals(Arrays.asList("V2", "V4", "V5", "V6"), outputTopic.readValuesToList());
driver.createOutputTopic(outputTopicName, new IntegerDeserializer(), new StringDeserializer());
assertEquals(Arrays.asList("V0", "V2", "V4", "V5", "V6"), outputTopic.readValuesToList());
});
}
}