KAFKA-5488: Add type-safe split() operator (#9107)

Implements KIP-418, that deprecated the `branch()` operator in favor of the newly added and type-safe `split()` operator.

Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>
This commit is contained in:
Ivan Ponomarev 2021-02-05 03:23:35 +03:00 committed by GitHub
parent d98df7fc4d
commit 5552da3a20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1095 additions and 70 deletions

View File

@ -334,7 +334,7 @@
generic functions where users could specify the input and output data types.</p>
<p>Some KStream transformations may generate one or more KStream objects, for example:
- <code class="docutils literal"><span class="pre">filter</span></code> and <code class="docutils literal"><span class="pre">map</span></code> on a KStream will generate another KStream
- <code class="docutils literal"><span class="pre">branch</span></code> on KStream can generate multiple KStreams</p>
- <code class="docutils literal"><span class="pre">split</span></code> on KStream can generate multiple KStreams</p>
<p>Some others may generate a KTable object, for example an aggregation of a KStream also yields a KTable. This allows Kafka Streams to continuously update the computed value upon arrivals of <a class="reference internal" href="../core-concepts.html#streams_concepts_aggregations"><span class="std std-ref">out-of-order records</span></a> after it
has already been produced to the downstream transformation operators.</p>
<p>All KTable transformation operations can only generate another KTable. However, the Kafka Streams DSL does provide a special function
@ -366,24 +366,28 @@
</ul>
</td>
<td><p class="first">Branch (or split) a <code class="docutils literal"><span class="pre">KStream</span></code> based on the supplied predicates into one or more <code class="docutils literal"><span class="pre">KStream</span></code> instances.
(<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#branch-org.apache.kafka.streams.kstream.Predicate...-">details</a>)</p>
(<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#split">details</a>)</p>
<p>Predicates are evaluated in order. A record is placed to one and only one output stream on the first match:
if the n-th predicate evaluates to true, the record is placed to n-th stream. If no predicate matches, the
the record is dropped.</p>
<p>Branching is useful, for example, to route records to different downstream topics.</p>
<div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;[]</span> <span class="n">branches</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">branch</span><span class="o">(</span>
<span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">key</span><span class="o">.</span><span class="na">startsWith</span><span class="o">(</span><span class="s">&quot;A&quot;</span><span class="o">),</span> <span class="cm">/* first predicate */</span>
<span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">key</span><span class="o">.</span><span class="na">startsWith</span><span class="o">(</span><span class="s">&quot;B&quot;</span><span class="o">),</span> <span class="cm">/* second predicate */</span>
<span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="kc">true</span> <span class="cm">/* third predicate */</span>
<span class="o">);</span>
<pre class="brush: java;">
KStream<String, Long> stream = ...;
Map<String, KStream<String, Long>> branches =
stream.split(Named.as("Branch-"))
.branch((key, value) -> key.startsWith("A"), /* first predicate */
Branched.as("A"))
.branch((key, value) -> key.startsWith("B"), /* second predicate */
Branched.as("B"))
.defaultBranch(Branched.as("C"))
);
<span class="c1">// KStream branches[0] contains all records whose keys start with &quot;A&quot;</span>
<span class="c1">// KStream branches[1] contains all records whose keys start with &quot;B&quot;</span>
<span class="c1">// KStream branches[2] contains all other records</span>
// KStream branches.get("Branch-A") contains all records whose keys start with "A"
// KStream branches.get("Branch-B") contains all records whose keys start with "B"
// KStream branches.get("Branch-C") contains all other records
<span class="c1">// Java 7 example: cf. `filter` for how to create `Predicate` instances</span></code></pre></div>
</div>
// Java 7 example: cf. `filter` for how to create `Predicate` instances
</pre>
</td>
</tr>
<tr class="row-odd"><td><p class="first"><strong>Filter</strong></p>

View File

@ -21,6 +21,12 @@
<h5><a id="upgrade_280_notable" href="#upgrade_280_notable">Notable changes in 2.8.0</a></h5>
<ul>
<li>Kafka Streams introduce a type-safe <code>split()</code> operator as a substitution for deprecated <code>KStream#branch()</code> method
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream">KIP-418</a>).
</li>
</ul>
<h4><a id="upgrade_2_7_0" href="#upgrade_2_7_0">Upgrading to 2.7.0 from any version 0.8.x through 2.6.x</a></h4>
<p><b>If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* The {@code Branched} class is used to define the optional parameters when building branches with
* {@link BranchedKStream}.
*
* @param <K> type of record key
* @param <V> type of record value
*/
public class Branched<K, V> implements NamedOperation<Branched<K, V>> {
protected final String name;
protected final Function<? super KStream<K, V>, ? extends KStream<K, V>> chainFunction;
protected final Consumer<? super KStream<K, V>> chainConsumer;
protected Branched(final String name,
final Function<? super KStream<K, V>, ? extends KStream<K, V>> chainFunction,
final Consumer<? super KStream<K, V>> chainConsumer) {
this.name = name;
this.chainFunction = chainFunction;
this.chainConsumer = chainConsumer;
}
/**
* Create an instance of {@code Branched} with provided branch name suffix.
*
* @param name the branch name suffix to be used (see {@link BranchedKStream} description for details)
* @param <K> key type
* @param <V> value type
* @return a new instance of {@code Branched}
*/
public static <K, V> Branched<K, V> as(final String name) {
Objects.requireNonNull(name, "name cannot be null");
return new Branched<>(name, null, null);
}
/**
* Create an instance of {@code Branched} with provided chain function.
*
* @param chain A function that will be applied to the branch. If the provided function returns
* {@code null}, its result is ignored, otherwise it is added to the {@code Map} returned
* by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see
* {@link BranchedKStream} description for details).
* @param <K> key type
* @param <V> value type
* @return a new instance of {@code Branched}
*/
public static <K, V> Branched<K, V> withFunction(
final Function<? super KStream<K, V>, ? extends KStream<K, V>> chain) {
Objects.requireNonNull(chain, "chain function cannot be null");
return new Branched<>(null, chain, null);
}
/**
* Create an instance of {@code Branched} with provided chain consumer.
*
* @param chain A consumer to which the branch will be sent. If a consumer is provided,
* the respective branch will not be added to the resulting {@code Map} returned
* by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see
* {@link BranchedKStream} description for details).
* @param <K> key type
* @param <V> value type
* @return a new instance of {@code Branched}
*/
public static <K, V> Branched<K, V> withConsumer(final Consumer<KStream<K, V>> chain) {
Objects.requireNonNull(chain, "chain consumer cannot be null");
return new Branched<>(null, null, chain);
}
/**
* Create an instance of {@code Branched} with provided chain function and branch name suffix.
*
* @param chain A function that will be applied to the branch. If the provided function returns
* {@code null}, its result is ignored, otherwise it is added to the {@code Map} returned
* by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see
* {@link BranchedKStream} description for details).
* @param name the branch name suffix to be used. If {@code null}, a default branch name suffix will be generated
* (see {@link BranchedKStream} description for details)
* @param <K> key type
* @param <V> value type
* @return a new instance of {@code Branched}
*/
public static <K, V> Branched<K, V> withFunction(
final Function<? super KStream<K, V>, ? extends KStream<K, V>> chain, final String name) {
Objects.requireNonNull(chain, "chain function cannot be null");
return new Branched<>(name, chain, null);
}
/**
* Create an instance of {@code Branched} with provided chain consumer and branch name suffix.
*
* @param chain A consumer to which the branch will be sent. If a non-null consumer is provided,
* the respective branch will not be added to the resulting {@code Map} returned
* by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see
* {@link BranchedKStream} description for details).
* @param name the branch name suffix to be used. If {@code null}, a default branch name suffix will be generated
* (see {@link BranchedKStream} description for details)
* @param <K> key type
* @param <V> value type
* @return a new instance of {@code Branched}
*/
public static <K, V> Branched<K, V> withConsumer(final Consumer<? super KStream<K, V>> chain,
final String name) {
Objects.requireNonNull(chain, "chain consumer cannot be null");
return new Branched<>(name, null, chain);
}
/**
* Create an instance of {@code Branched} from an existing instance.
*
* @param branched the instance of {@code Branched} to copy
*/
protected Branched(final Branched<K, V> branched) {
this(branched.name, branched.chainFunction, branched.chainConsumer);
}
/**
* Configure the instance of {@code Branched} with a branch name suffix.
*
* @param name the branch name suffix to be used. If {@code null} a default branch name suffix will be generated (see
* {@link BranchedKStream} description for details)
* @return {@code this}
*/
@Override
public Branched<K, V> withName(final String name) {
return new Branched<>(name, chainFunction, chainConsumer);
}
}

View File

@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import java.util.Map;
/**
* Branches the records in the original stream based on the predicates supplied for the branch definitions.
* <p>
* Branches are defined with {@link BranchedKStream#branch(Predicate, Branched)} or
* {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is evaluated against the predicates
* supplied via {@link Branched} parameters, and is routed to the first branch for which its respective predicate
* evaluates to {@code true}. If a record does not match any predicates, it will be routed to the default branch,
* or dropped if no default branch is created.
* <p>
* Each branch (which is a {@link KStream} instance) then can be processed either by
* a {@link java.util.function.Function} or a {@link java.util.function.Consumer} provided via a {@link Branched}
* parameter. If certain conditions are met, it also can be accessed from the {@link Map} returned by an optional
* {@link BranchedKStream#defaultBranch(Branched)} or {@link BranchedKStream#noDefaultBranch()} method call
* (see <a href="#examples">usage examples</a>).
* <p>
* The branching happens on a first-match basis: A record in the original stream is assigned to the corresponding result
* stream for the first predicate that evaluates to {@code true}, and is assigned to this stream only. If you need
* to route a record to multiple streams, you can apply multiple {@link KStream#filter(Predicate)} operators
* to the same {@link KStream} instance, one for each predicate, instead of branching.
* <p>
* The process of routing the records to different branches is a stateless record-by-record operation.
*
* <h2><a name="maprules">Rules of forming the resulting map</a></h2>
* The keys of the {@code Map<String, KStream<K, V>>} entries returned by {@link BranchedKStream#defaultBranch(Branched)} or
* {@link BranchedKStream#noDefaultBranch()} are defined by the following rules:
* <ul>
* <li>If {@link Named} parameter was provided for {@link KStream#split(Named)}, its value is used as
* a prefix for each key. By default, no prefix is used
* <li>If a branch name is provided in {@link BranchedKStream#branch(Predicate, Branched)} via the
* {@link Branched} parameter, its value is appended to the prefix to form the {@code Map} key
* <li>If a name is not provided for the branch, then the key defaults to {@code prefix + position} of the branch
* as a decimal number, starting from {@code "1"}
* <li>If a name is not provided for the {@link BranchedKStream#defaultBranch()} call, then the key defaults
* to {@code prefix + "0"}
* </ul>
* The values of the respective {@code Map<Stream, KStream<K, V>>} entries are formed as following:
* <ul>
* <li>If no chain function or consumer is provided in {@link BranchedKStream#branch(Predicate, Branched)} via
* the {@link Branched} parameter, then the the branch itself is added to the {@code Map}
* <li>If chain function is provided and it returns a non-null value for a given branch, then the value
* is the result returned by this function
* <li>If a chain function returns {@code null} for a given branch, then no entry is added to the map
* <li>If a consumer is provided for a given branch, then no entry is added to the map
* </ul>
* For example:
* <pre> {@code
* Map<String, KStream<..., ...>> result =
* source.split(Named.as("foo-"))
* .branch(predicate1, Branched.as("bar")) // "foo-bar"
* .branch(predicate2, Branched.withConsumer(ks->ks.to("A")) // no entry: a Consumer is provided
* .branch(predicate3, Branched.withFunction(ks->null)) // no entry: chain function returns null
* .branch(predicate4) // "foo-4": name defaults to the branch position
* .defaultBranch() // "foo-0": "0" is the default name for the default branch
* }</pre>
*
* <h2><a name="examples">Usage examples</a></h2>
*
* <h3>Direct Branch Consuming</h3>
* In many cases we do not need to have a single scope for all the branches, each branch being processed completely
* independently from others. Then we can use 'consuming' lambdas or method references in {@link Branched} parameter:
*
* <pre> {@code
* source.split()
* .branch(predicate1, Branched.withConsumer(ks -> ks.to("A")))
* .branch(predicate2, Branched.withConsumer(ks -> ks.to("B")))
* .defaultBranch(Branched.withConsumer(ks->ks.to("C")));
* }</pre>
*
* <h3>Collecting branches in a single scope</h3>
* In other cases we want to combine branches again after splitting. The map returned by
* {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} methods provides
* access to all the branches in the same scope:
*
* <pre> {@code
* Map<String, KStream<String, String>> branches = source.split(Named.as("split-"))
* .branch((key, value) -> value == null, Branched.withFunction(s -> s.mapValues(v->"NULL"), "null")
* .defaultBranch(Branched.as("non-null"));
*
* KStream<String, String> merged = branches.get("split-non-null").merge(branches.get("split-null"));
* }</pre>
*
* <h3>Dynamic branching</h3>
* There is also a case when we might need to create branches dynamically, e. g. one per enum value:
*
* <pre> {@code
* BranchedKStream branched = stream.split();
* for (RecordType recordType : RecordType.values())
* branched.branch((k, v) -> v.getRecType() == recordType,
* Branched.withConsumer(recordType::processRecords));
* }</pre>
*
* @param <K> Type of keys
* @param <V> Type of values
* @see KStream
*/
public interface BranchedKStream<K, V> {
/**
* Define a branch for records that match the predicate.
*
* @param predicate A {@link Predicate} instance, against which each record will be evaluated.
* If this predicate returns {@code true} for a given record, the record will be
* routed to the current branch and will not be evaluated against the predicates
* for the remaining branches.
* @return {@code this} to facilitate method chaining
*/
BranchedKStream<K, V> branch(Predicate<? super K, ? super V> predicate);
/**
* Define a branch for records that match the predicate.
*
* @param predicate A {@link Predicate} instance, against which each record will be evaluated.
* If this predicate returns {@code true} for a given record, the record will be
* routed to the current branch and will not be evaluated against the predicates
* for the remaining branches.
* @param branched A {@link Branched} parameter, that allows to define a branch name, an in-place
* branch consumer or branch mapper (see <a href="#examples">code examples</a>
* for {@link BranchedKStream})
* @return {@code this} to facilitate method chaining
*/
BranchedKStream<K, V> branch(Predicate<? super K, ? super V> predicate, Branched<K, V> branched);
/**
* Finalize the construction of branches and defines the default branch for the messages not intercepted
* by other branches. Calling {@code defaultBranch} or {@link #noDefaultBranch()} is optional.
*
* @return {@link Map} of named branches. For rules of forming the resulting map, see {@code BranchedKStream}
* <a href="#maprules">description</a>.
*/
Map<String, KStream<K, V>> defaultBranch();
/**
* Finalize the construction of branches and defines the default branch for the messages not intercepted
* by other branches. Calling {@code defaultBranch} or {@link #noDefaultBranch()} is optional.
*
* @param branched A {@link Branched} parameter, that allows to define a branch name, an in-place
* branch consumer or branch mapper (see <a href="#examples">code examples</a>
* for {@link BranchedKStream})
* @return {@link Map} of named branches. For rules of forming the resulting map, see {@link BranchedKStream}
* <a href="#maprules">description</a>.
*/
Map<String, KStream<K, V>> defaultBranch(Branched<K, V> branched);
/**
* Finalize the construction of branches without forming a default branch. Calling {@code #noDefaultBranch()}
* or {@link #defaultBranch()} is optional.
*
* @return {@link Map} of named branches. For rules of forming the resulting map, see {@link BranchedKStream}
* <a href="#maprules">description</a>.
*/
Map<String, KStream<K, V>> noDefaultBranch();
}

View File

@ -756,7 +756,9 @@ public interface KStream<K, V> {
*
* @param predicates the ordered list of {@link Predicate} instances
* @return multiple distinct substreams of this {@code KStream}
* @deprecated since 2.8. Use {@link #split()} instead.
*/
@Deprecated
@SuppressWarnings("unchecked")
KStream<K, V>[] branch(final Predicate<? super K, ? super V>... predicates);
@ -773,10 +775,32 @@ public interface KStream<K, V> {
* @param named a {@link Named} config used to name the processor in the topology
* @param predicates the ordered list of {@link Predicate} instances
* @return multiple distinct substreams of this {@code KStream}
* @deprecated since 2.8. Use {@link #split(Named)} instead.
*/
@Deprecated
@SuppressWarnings("unchecked")
KStream<K, V>[] branch(final Named named, final Predicate<? super K, ? super V>... predicates);
/**
* Split this stream. {@link BranchedKStream} can be used for routing the records to different branches depending
* on evaluation against the supplied predicates.
* Stream branching is a stateless record-by-record operation.
*
* @return {@link BranchedKStream} that provides methods for routing the records to different branches.
*/
BranchedKStream<K, V> split();
/**
* Split this stream. {@link BranchedKStream} can be used for routing the records to different branches depending
* on evaluation against the supplied predicates.
* Stream branching is a stateless record-by-record operation.
*
* @param named a {@link Named} config used to name the processor in the topology and also to set the name prefix
* for the resulting branches (see {@link BranchedKStream})
* @return {@link BranchedKStream} that provides methods for routing the records to different branches.
*/
BranchedKStream<K, V> split(final Named named);
/**
* Merge this stream and the given stream into one larger stream.
* <p>
@ -817,7 +841,7 @@ public interface KStream<K, V> {
*
* @param topic the topic name
* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
* @deprecated since 2.6; used {@link #repartition()} instead
* @deprecated since 2.6; use {@link #repartition()} instead
*/
// TODO: when removed, update `StreamsResetter` decription of --intermediate-topics
@Deprecated

View File

@ -27,7 +27,7 @@ import org.apache.kafka.streams.KeyValue;
* @param <V> value type
* @see KStream#filter(Predicate)
* @see KStream#filterNot(Predicate)
* @see KStream#branch(Predicate[])
* @see BranchedKStream#branch(Predicate)
* @see KTable#filter(Predicate)
* @see KTable#filterNot(Predicate)
*/

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.KStream;
import java.util.function.Consumer;
import java.util.function.Function;
class BranchedInternal<K, V> extends Branched<K, V> {
BranchedInternal(final Branched<K, V> branched) {
super(branched);
}
BranchedInternal() {
super(null, null, null);
}
static <K, V> BranchedInternal<K, V> empty() {
return new BranchedInternal<>();
}
String name() {
return name;
}
public Function<? super KStream<K, V>, ? extends KStream<K, V>> chainFunction() {
return chainFunction;
}
public Consumer<? super KStream<K, V>> chainConsumer() {
return chainConsumer;
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.BranchedKStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class BranchedKStreamImpl<K, V> implements BranchedKStream<K, V> {
private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
private final KStreamImpl<K, V> source;
private final boolean repartitionRequired;
private final String splitterName;
private final Map<String, KStream<K, V>> outputBranches = new HashMap<>();
private final List<Predicate<? super K, ? super V>> predicates = new ArrayList<>();
private final List<String> childNames = new ArrayList<>();
private final ProcessorGraphNode<K, V> splitterNode;
BranchedKStreamImpl(final KStreamImpl<K, V> source, final boolean repartitionRequired, final NamedInternal named) {
this.source = source;
this.repartitionRequired = repartitionRequired;
this.splitterName = named.orElseGenerateWithPrefix(source.builder, BRANCH_NAME);
// predicates and childNames are passed by reference so when the user adds a branch they get added to
final ProcessorParameters<K, V, ?, ?> processorParameters =
new ProcessorParameters<>(new KStreamBranch<>(predicates, childNames), splitterName);
splitterNode = new ProcessorGraphNode<>(splitterName, processorParameters);
source.builder.addGraphNode(source.graphNode, splitterNode);
}
@Override
public BranchedKStream<K, V> branch(final Predicate<? super K, ? super V> predicate) {
return branch(predicate, BranchedInternal.empty());
}
@Override
public BranchedKStream<K, V> branch(final Predicate<? super K, ? super V> predicate, final Branched<K, V> branched) {
predicates.add(predicate);
createBranch(branched, predicates.size());
return this;
}
@Override
public Map<String, KStream<K, V>> defaultBranch() {
return defaultBranch(BranchedInternal.empty());
}
@Override
public Map<String, KStream<K, V>> defaultBranch(final Branched<K, V> branched) {
createBranch(branched, 0);
return outputBranches;
}
private void createBranch(final Branched<K, V> branched, final int index) {
final BranchedInternal<K, V> branchedInternal = new BranchedInternal<>(branched);
final String branchChildName = getBranchChildName(index, branchedInternal);
childNames.add(branchChildName);
source.builder.newProcessorName(branchChildName);
final ProcessorParameters<K, V, ?, ?> parameters = new ProcessorParameters<>(new PassThrough<>(), branchChildName);
final ProcessorGraphNode<K, V> branchChildNode = new ProcessorGraphNode<>(branchChildName, parameters);
source.builder.addGraphNode(splitterNode, branchChildNode);
final KStreamImpl<K, V> branch = new KStreamImpl<>(branchChildName, source.keySerde,
source.valueSerde, source.subTopologySourceNodes,
repartitionRequired, branchChildNode, source.builder);
process(branch, branchChildName, branchedInternal);
}
private String getBranchChildName(final int index, final BranchedInternal<K, V> branchedInternal) {
if (branchedInternal.name() == null) {
return splitterName + index;
} else {
return splitterName + branchedInternal.name();
}
}
private void process(final KStreamImpl<K, V> branch, final String branchChildName,
final BranchedInternal<K, V> branchedInternal) {
if (branchedInternal.chainFunction() != null) {
final KStream<K, V> transformedStream = branchedInternal.chainFunction().apply(branch);
if (transformedStream != null) {
outputBranches.put(branchChildName, transformedStream);
}
} else if (branchedInternal.chainConsumer() != null) {
branchedInternal.chainConsumer().accept(branch);
} else {
outputBranches.put(branchChildName, branch);
}
}
@Override
public Map<String, KStream<K, V>> noDefaultBranch() {
return outputBranches;
}
}

View File

@ -22,13 +22,15 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.To;
import java.util.List;
class KStreamBranch<K, V> implements ProcessorSupplier<K, V> {
private final Predicate<K, V>[] predicates;
private final String[] childNodes;
private final List<Predicate<? super K, ? super V>> predicates;
private final List<String> childNodes;
KStreamBranch(final Predicate<K, V>[] predicates,
final String[] childNodes) {
KStreamBranch(final List<Predicate<? super K, ? super V>> predicates,
final List<String> childNodes) {
this.predicates = predicates;
this.childNodes = childNodes;
}
@ -41,13 +43,17 @@ class KStreamBranch<K, V> implements ProcessorSupplier<K, V> {
private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
@Override
public void process(final K key, final V value) {
for (int i = 0; i < predicates.length; i++) {
if (predicates[i].test(key, value)) {
for (int i = 0; i < predicates.size(); i++) {
if (predicates.get(i).test(key, value)) {
// use forward with child here and then break the loop
// so that no record is going to be piped to multiple streams
context().forward(key, value, To.child(childNodes[i]));
break;
}
context().forward(key, value, To.child(childNodes.get(i)));
return;
}
}
// using default child node if supplied
if (childNodes.size() > predicates.size()) {
context().forward(key, value, To.child(childNodes.get(predicates.size())));
}
}
}

View File

@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.BranchedKStream;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped;
@ -440,12 +441,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
builder);
}
@Deprecated
@Override
@SuppressWarnings("unchecked")
public KStream<K, V>[] branch(final Predicate<? super K, ? super V>... predicates) {
return doBranch(NamedInternal.empty(), predicates);
}
@Deprecated
@Override
@SuppressWarnings("unchecked")
public KStream<K, V>[] branch(final Named named,
@ -472,7 +475,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
final ProcessorParameters processorParameters =
new ProcessorParameters<>(new KStreamBranch(predicates.clone(), childNames), branchName);
new ProcessorParameters<>(new KStreamBranch(Arrays.asList(predicates.clone()),
Arrays.asList(childNames)), branchName);
final ProcessorGraphNode<K, V> branchNode =
new ProcessorGraphNode<>(branchName, processorParameters);
@ -492,6 +496,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
return branchChildren;
}
@Override
public BranchedKStream<K, V> split() {
return new BranchedKStreamImpl<>(this, repartitionRequired, NamedInternal.empty());
}
@Override
public BranchedKStream<K, V> split(final Named named) {
Objects.requireNonNull(named, "named can't be null");
return new BranchedKStreamImpl<>(this, repartitionRequired, new NamedInternal(named));
}
@Override
public KStream<K, V> merge(final KStream<K, V> stream) {
return merge(stream, NamedInternal.empty());

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Topology.AutoOffsetReset;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Grouped;
@ -644,7 +645,7 @@ public class StreamsBuilderTest {
}
@Test
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "deprecation"})
public void shouldUseSpecifiedNameForBranchOperation() {
builder.stream(STREAM_TOPIC)
.branch(Named.as("branch-processor"), (k, v) -> true, (k, v) -> false);
@ -658,6 +659,21 @@ public class StreamsBuilderTest {
"branch-processor-predicate-1");
}
@Test
public void shouldUseSpecifiedNameForSplitOperation() {
builder.stream(STREAM_TOPIC)
.split(Named.as("branch-processor"))
.branch((k, v) -> true, Branched.as("-1"))
.branch((k, v) -> false, Branched.as("-2"));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForOperation(topology,
"KSTREAM-SOURCE-0000000000",
"branch-processor",
"branch-processor-1",
"branch-processor-2");
}
@Test
public void shouldUseSpecifiedNameForJoinOperationBetweenKStreamAndKTable() {
final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);

View File

@ -40,7 +40,7 @@ public class KStreamBranchTest {
private final String topicName = "topic";
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "deprecation"})
@Test
public void testKStreamBranch() {
final StreamsBuilder builder = new StreamsBuilder();
@ -77,7 +77,7 @@ public class KStreamBranchTest {
assertEquals(2, processors.get(2).processed().size());
}
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "deprecation"})
@Test
public void testTypeVariance() {
final Predicate<Number, Object> positive = (key, value) -> key.doubleValue() > 0;

View File

@ -481,7 +481,7 @@ public class KStreamImplTest {
}
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
@SuppressWarnings({"rawtypes", "unchecked", "deprecation"})
public void shouldNotAllowNullPredicatedOnBranch() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
@ -490,7 +490,7 @@ public class KStreamImplTest {
}
@Test
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "deprecation"})
public void shouldHaveAtLeastOnPredicateWhenBranching() {
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
@ -498,7 +498,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("branch() requires at least one predicate"));
}
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "deprecation"})
@Test
public void shouldHaveAtLeastOnPredicateWhenBranchingWithNamed() {
final IllegalArgumentException exception = assertThrows(
@ -507,7 +507,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("branch() requires at least one predicate"));
}
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "deprecation"})
@Test
public void shouldNotAllowNullNamedOnBranch() {
final NullPointerException exception = assertThrows(
@ -516,7 +516,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("named can't be null"));
}
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "deprecation"})
@Test
public void shouldCantHaveNullPredicate() {
final NullPointerException exception = assertThrows(
@ -525,7 +525,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("predicates can't be null"));
}
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "deprecation"})
@Test
public void shouldCantHaveNullPredicateWithNamed() {
final NullPointerException exception = assertThrows(

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import static org.junit.Assert.assertEquals;
public class KStreamSplitTest {
private final String topicName = "topic";
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private final StreamsBuilder builder = new StreamsBuilder();
private final Predicate<Integer, String> isEven = (key, value) -> (key % 2) == 0;
private final Predicate<Integer, String> isMultipleOfThree = (key, value) -> (key % 3) == 0;
private final Predicate<Integer, String> isMultipleOfFive = (key, value) -> (key % 5) == 0;
private final Predicate<Integer, String> isMultipleOfSeven = (key, value) -> (key % 7) == 0;
private final KStream<Integer, String> source = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
@Test
public void testKStreamSplit() {
final Map<String, KStream<Integer, String>> branches =
source.split()
.branch(isEven, Branched.withConsumer(ks -> ks.to("x2")))
.branch(isMultipleOfThree, Branched.withConsumer(ks -> ks.to("x3")))
.branch(isMultipleOfFive, Branched.withConsumer(ks -> ks.to("x5"))).noDefaultBranch();
assertEquals(0, branches.size());
builder.build();
withDriver(driver -> {
final TestOutputTopic<Integer, String> x2 = driver.createOutputTopic("x2", new IntegerDeserializer(), new StringDeserializer());
final TestOutputTopic<Integer, String> x3 = driver.createOutputTopic("x3", new IntegerDeserializer(), new StringDeserializer());
final TestOutputTopic<Integer, String> x5 = driver.createOutputTopic("x5", new IntegerDeserializer(), new StringDeserializer());
assertEquals(Arrays.asList("V2", "V4", "V6"), x2.readValuesToList());
assertEquals(Arrays.asList("V3"), x3.readValuesToList());
assertEquals(Arrays.asList("V5"), x5.readValuesToList());
});
}
private void withDriver(final Consumer<TopologyTestDriver> test) {
final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
final Topology topology = builder.build();
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
final TestInputTopic<Integer, String> inputTopic = driver.createInputTopic(topicName, new IntegerSerializer(), new StringSerializer());
for (final int expectedKey : expectedKeys) {
inputTopic.pipeInput(expectedKey, "V" + expectedKey);
}
test.accept(driver);
}
}
@Test
public void testTypeVariance() {
final Predicate<Number, Object> positive = (key, value) -> key.doubleValue() > 0;
final Predicate<Number, Object> negative = (key, value) -> key.doubleValue() < 0;
new StreamsBuilder()
.<Integer, String>stream("empty")
.split()
.branch(positive)
.branch(negative);
}
@Test
public void testResultingMap() {
final Map<String, KStream<Integer, String>> branches =
source.split(Named.as("foo-"))
// "foo-bar"
.branch(isEven, Branched.as("bar"))
// no entry: a Consumer is provided
.branch(isMultipleOfThree, Branched.withConsumer(ks -> {
}))
// no entry: chain function returns null
.branch(isMultipleOfFive, Branched.withFunction(ks -> null))
// "foo-4": name defaults to the branch position
.branch(isMultipleOfSeven)
// "foo-0": "0" is the default name for the default branch
.defaultBranch();
assertEquals(3, branches.size());
branches.get("foo-bar").to("foo-bar");
branches.get("foo-4").to("foo-4");
branches.get("foo-0").to("foo-0");
builder.build();
withDriver(driver -> {
final TestOutputTopic<Integer, String> even = driver.createOutputTopic("foo-bar", new IntegerDeserializer(), new StringDeserializer());
final TestOutputTopic<Integer, String> x7 = driver.createOutputTopic("foo-4", new IntegerDeserializer(), new StringDeserializer());
final TestOutputTopic<Integer, String> defaultBranch = driver.createOutputTopic("foo-0", new IntegerDeserializer(), new StringDeserializer());
assertEquals(Arrays.asList("V2", "V4", "V6"), even.readValuesToList());
assertEquals(Arrays.asList("V7"), x7.readValuesToList());
assertEquals(Arrays.asList("V1"), defaultBranch.readValuesToList());
});
}
@Test
public void testBranchingWithNoTerminalOperation() {
source.split()
.branch(isEven, Branched.withConsumer(ks -> ks.to("output")))
.branch(isMultipleOfFive, Branched.withConsumer(ks -> ks.to("output")));
builder.build();
withDriver(driver -> {
final TestOutputTopic<Integer, String> outputTopic =
driver.createOutputTopic("output", new IntegerDeserializer(), new StringDeserializer());
assertEquals(Arrays.asList("V2", "V4", "V5", "V6"), outputTopic.readValuesToList());
});
}
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
@ -112,7 +113,6 @@ public class StreamsGraphTest {
}
@Test
@SuppressWarnings("unchecked")
// Topology in this test from https://issues.apache.org/jira/browse/KAFKA-9739
public void shouldNotThrowNPEWithMergeNodes() {
final Properties properties = new Properties();
@ -165,18 +165,19 @@ public class StreamsGraphTest {
.leftJoin(idTable, (v1, v2) -> v1 + v2,
Joined.with(Serdes.String(), Serdes.String(), Serdes.String()));
final KStream<String, String>[] branches = joinStream.branch((k, v) -> v.equals("some-value"), (k, v) -> true);
branches[0].map(KeyValue::pair)
joinStream.split()
.branch((k, v) -> v.equals("some-value"), Branched.withConsumer(ks -> ks.map(KeyValue::pair)
.peek((recipientId, command) -> System.out.println("printing out"))
.to("external-command", Produced.with(Serdes.String(), Serdes.String()));
branches[1].filter((k, v) -> v != null)
.to("external-command", Produced.with(Serdes.String(), Serdes.String()))
))
.defaultBranch(Branched.withConsumer(ks -> {
ks.filter((k, v) -> v != null)
.peek((subscriptionId, wrapper) -> System.out.println("Printing output"))
.mapValues((k, v) -> v)
.to("dlq-topic", Produced.with(Serdes.String(), Serdes.String()));
branches[1].map(KeyValue::pair).to("retryTopic", Produced.with(Serdes.String(), Serdes.String()));
ks.map(KeyValue::pair).to("retryTopic", Produced.with(Serdes.String(), Serdes.String()));
}));
final Topology topology = builder.build(properties);
assertEquals(expectedComplexMergeOptimizeTopology, topology.describe().toString());
@ -473,42 +474,42 @@ public class StreamsGraphTest {
" --> KSTREAM-BRANCH-0000000027\n" +
" <-- KSTREAM-SOURCE-0000000025\n" +
" Processor: KSTREAM-BRANCH-0000000027 (stores: [])\n" +
" --> KSTREAM-BRANCHCHILD-0000000029, KSTREAM-BRANCHCHILD-0000000028\n" +
" --> KSTREAM-BRANCH-00000000270, KSTREAM-BRANCH-00000000271\n" +
" <-- KSTREAM-LEFTJOIN-0000000026\n" +
" Processor: KSTREAM-BRANCHCHILD-0000000029 (stores: [])\n" +
" Processor: KSTREAM-BRANCH-00000000270 (stores: [])\n" +
" --> KSTREAM-FILTER-0000000033, KSTREAM-MAP-0000000037\n" +
" <-- KSTREAM-BRANCH-0000000027\n" +
" Processor: KSTREAM-BRANCHCHILD-0000000028 (stores: [])\n" +
" --> KSTREAM-MAP-0000000030\n" +
" Processor: KSTREAM-BRANCH-00000000271 (stores: [])\n" +
" --> KSTREAM-MAP-0000000029\n" +
" <-- KSTREAM-BRANCH-0000000027\n" +
" Processor: KSTREAM-FILTER-0000000033 (stores: [])\n" +
" --> KSTREAM-PEEK-0000000034\n" +
" <-- KSTREAM-BRANCHCHILD-0000000029\n" +
" <-- KSTREAM-BRANCH-00000000270\n" +
" Source: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-source (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition])\n" +
" --> KSTREAM-PEEK-0000000013\n" +
" Processor: KSTREAM-MAP-0000000030 (stores: [])\n" +
" --> KSTREAM-PEEK-0000000031\n" +
" <-- KSTREAM-BRANCHCHILD-0000000028\n" +
" Processor: KSTREAM-MAP-0000000029 (stores: [])\n" +
" --> KSTREAM-PEEK-0000000030\n" +
" <-- KSTREAM-BRANCH-00000000271\n" +
" Processor: KSTREAM-PEEK-0000000034 (stores: [])\n" +
" --> KSTREAM-MAPVALUES-0000000035\n" +
" <-- KSTREAM-FILTER-0000000033\n" +
" Processor: KSTREAM-MAP-0000000037 (stores: [])\n" +
" --> KSTREAM-SINK-0000000038\n" +
" <-- KSTREAM-BRANCHCHILD-0000000029\n" +
" <-- KSTREAM-BRANCH-00000000270\n" +
" Processor: KSTREAM-MAPVALUES-0000000035 (stores: [])\n" +
" --> KSTREAM-SINK-0000000036\n" +
" <-- KSTREAM-PEEK-0000000034\n" +
" Processor: KSTREAM-PEEK-0000000013 (stores: [])\n" +
" --> KSTREAM-AGGREGATE-0000000015\n" +
" <-- KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-source\n" +
" Processor: KSTREAM-PEEK-0000000031 (stores: [])\n" +
" --> KSTREAM-SINK-0000000032\n" +
" <-- KSTREAM-MAP-0000000030\n" +
" Processor: KSTREAM-PEEK-0000000030 (stores: [])\n" +
" --> KSTREAM-SINK-0000000031\n" +
" <-- KSTREAM-MAP-0000000029\n" +
" Processor: KSTREAM-AGGREGATE-0000000015 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n" +
" --> none\n" +
" <-- KSTREAM-PEEK-0000000013\n" +
" Sink: KSTREAM-SINK-0000000032 (topic: external-command)\n" +
" <-- KSTREAM-PEEK-0000000031\n" +
" Sink: KSTREAM-SINK-0000000031 (topic: external-command)\n" +
" <-- KSTREAM-PEEK-0000000030\n" +
" Sink: KSTREAM-SINK-0000000036 (topic: dlq-topic)\n" +
" <-- KSTREAM-MAPVALUES-0000000035\n" +
" Sink: KSTREAM-SINK-0000000038 (topic: retryTopic)\n" +

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.kstream.{Branched => BranchedJ, KStream => KStreamJ}
object Branched {
/**
* Create an instance of `Branched` with provided branch name suffix.
*
* @param name the branch name suffix to be used (see [[BranchedKStream]] description for details)
* @tparam K key type
* @tparam V value type
* @return a new instance of `Branched`
*/
def as[K, V](name: String): BranchedJ[K, V] =
BranchedJ.as[K, V](name)
/**
* Create an instance of `Branched` with provided chain function and branch name suffix.
*
* @param chain A function that will be applied to the branch. If the provided function returns
* `null`, its result is ignored, otherwise it is added to the Map returned
* by [[BranchedKStream.defaultBranch]] or [[BranchedKStream.noDefaultBranch]] (see
* [[BranchedKStream]] description for details).
* @param name the branch name suffix to be used. If `null`, a default branch name suffix will be generated
* (see [[BranchedKStream]] description for details)
* @tparam K key type
* @tparam V value type
* @return a new instance of `Branched`
* @see `org.apache.kafka.streams.kstream.Branched#withFunction(java.util.function.Function, java.lang.String)`
*/
def withFunction[K, V](chain: KStream[K, V] => KStream[K, V], name: String = null): BranchedJ[K, V] =
BranchedJ.withFunction((f: KStreamJ[K, V]) => chain.apply(new KStream[K, V](f)).inner, name)
/**
* Create an instance of `Branched` with provided chain consumer and branch name suffix.
*
* @param chain A consumer to which the branch will be sent. If a non-null consumer is provided here,
* the respective branch will not be added to the resulting Map returned
* by [[BranchedKStream.defaultBranch]] or [[BranchedKStream.noDefaultBranch]] (see
* [[BranchedKStream]] description for details).
* @param name the branch name suffix to be used. If `null`, a default branch name suffix will be generated
* (see [[BranchedKStream]] description for details)
* @tparam K key type
* @tparam V value type
* @return a new instance of `Branched`
* @see `org.apache.kafka.streams.kstream.Branched#withConsumer(java.util.function.Consumer, java.lang.String)`
*/
def withConsumer[K, V](chain: KStream[K, V] => Unit, name: String = null): BranchedJ[K, V] =
BranchedJ.withConsumer((c: KStreamJ[K, V]) => chain.apply(new KStream[K, V](c)), name)
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.scala.kstream
import java.util
import org.apache.kafka.streams.kstream
import org.apache.kafka.streams.kstream.{BranchedKStream => BranchedKStreamJ}
import org.apache.kafka.streams.scala.FunctionsCompatConversions.PredicateFromFunction
import scala.jdk.CollectionConverters._
/**
* Branches the records in the original stream based on the predicates supplied for the branch definitions.
* <p>
* Branches are defined with [[branch]] or [[defaultBranch]] methods. Each record is evaluated against the predicates
* supplied via [[Branched]] parameters, and is routed to the first branch for which its respective predicate
* evaluates to `true`. If a record does not match any predicates, it will be routed to the default branch,
* or dropped if no default branch is created.
* <p>
*
* Each branch (which is a [[KStream]] instance) then can be processed either by
* a function or a consumer provided via a [[Branched]]
* parameter. If certain conditions are met, it also can be accessed from the `Map` returned by
* an optional [[defaultBranch]] or [[noDefaultBranch]] method call.
* <p>
* The branching happens on a first match basis: A record in the original stream is assigned to the corresponding result
* stream for the first predicate that evaluates to true, and is assigned to this stream only. If you need
* to route a record to multiple streams, you can apply multiple [[KStream.filter]] operators to the same [[KStream]]
* instance, one for each predicate, instead of branching.
* <p>
* The process of routing the records to different branches is a stateless record-by-record operation.
*
* @tparam K Type of keys
* @tparam V Type of values
*/
class BranchedKStream[K, V](val inner: BranchedKStreamJ[K, V]) {
/**
* Define a branch for records that match the predicate.
*
* @param predicate A predicate against which each record will be evaluated.
* If this predicate returns `true` for a given record, the record will be
* routed to the current branch and will not be evaluated against the predicates
* for the remaining branches.
* @return `this` to facilitate method chaining
*/
def branch(predicate: (K, V) => Boolean): BranchedKStream[K, V] = {
inner.branch(predicate.asPredicate)
this
}
/**
* Define a branch for records that match the predicate.
*
* @param predicate A predicate against which each record will be evaluated.
* If this predicate returns `true` for a given record, the record will be
* routed to the current branch and will not be evaluated against the predicates
* for the remaining branches.
* @param branched A [[Branched]] parameter, that allows to define a branch name, an in-place
* branch consumer or branch mapper (see <a href="#examples">code examples</a>
* for [[BranchedKStream]])
* @return `this` to facilitate method chaining
*/
def branch(predicate: (K, V) => Boolean, branched: Branched[K, V]): BranchedKStream[K, V] = {
inner.branch(predicate.asPredicate, branched)
this
}
/**
* Finalize the construction of branches and defines the default branch for the messages not intercepted
* by other branches. Calling [[defaultBranch]] or [[noDefaultBranch]] is optional.
*
* @return Map of named branches. For rules of forming the resulting map, see [[BranchedKStream]]
* description.
*/
def defaultBranch(): Map[String, KStream[K, V]] = toScalaMap(inner.defaultBranch())
/**
* Finalize the construction of branches and defines the default branch for the messages not intercepted
* by other branches. Calling [[defaultBranch]] or [[noDefaultBranch]] is optional.
*
* @param branched A [[Branched]] parameter, that allows to define a branch name, an in-place
* branch consumer or branch mapper for [[BranchedKStream]].
* @return Map of named branches. For rules of forming the resulting map, see [[BranchedKStream]]
* description.
*/
def defaultBranch(branched: Branched[K, V]): Map[String, KStream[K, V]] = toScalaMap(inner.defaultBranch(branched))
/**
* Finalizes the construction of branches without forming a default branch.
*
* @return Map of named branches. For rules of forming the resulting map, see [[BranchedKStream]]
* description.
*/
def noDefaultBranch(): Map[String, KStream[K, V]] = toScalaMap(inner.noDefaultBranch())
private def toScalaMap(m: util.Map[String, kstream.KStream[K, V]]): collection.immutable.Map[String, KStream[K, V]] =
m.asScala.map {
case (name, kStreamJ) => (name, new KStream(kStreamJ))
}.toMap
}

View File

@ -333,23 +333,36 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @param predicates the ordered list of functions that return a Boolean
* @return multiple distinct substreams of this [[KStream]]
* @see `org.apache.kafka.streams.kstream.KStream#branch`
* @deprecated since 2.8. Use `split` instead.
*/
//noinspection ScalaUnnecessaryParentheses
@deprecated("use `split()` instead", "2.8")
def branch(predicates: ((K, V) => Boolean)*): Array[KStream[K, V]] =
inner.branch(predicates.map(_.asPredicate): _*).map(kstream => new KStream(kstream))
/**
* Creates an array of `KStream` from this stream by branching the records in the original stream based on
* the supplied predicates.
* Split this stream. [[BranchedKStream]] can be used for routing the records to different branches depending
* on evaluation against the supplied predicates.
* Stream branching is a stateless record-by-record operation.
*
* @param named a [[Named]] config used to name the processor in the topology
* @param predicates the ordered list of functions that return a Boolean
* @return multiple distinct substreams of this [[KStream]]
* @see `org.apache.kafka.streams.kstream.KStream#branch`
* @return [[BranchedKStream]] that provides methods for routing the records to different branches.
* @see `org.apache.kafka.streams.kstream.KStream#split`
*/
//noinspection ScalaUnnecessaryParentheses
def branch(named: Named, predicates: ((K, V) => Boolean)*): Array[KStream[K, V]] =
inner.branch(named, predicates.map(_.asPredicate): _*).map(kstream => new KStream(kstream))
def split(): BranchedKStream[K, V] =
new BranchedKStream(inner.split())
/**
* Split this stream. [[BranchedKStream]] can be used for routing the records to different branches depending
* on evaluation against the supplied predicates.
* Stream branching is a stateless record-by-record operation.
*
* @param named a [[Named]] config used to name the processor in the topology and also to set the name prefix
* for the resulting branches (see [[BranchedKStream]])
* @return [[BranchedKStream]] that provides methods for routing the records to different branches.
* @see `org.apache.kafka.streams.kstream.KStream#split`
*/
def split(named: Named): BranchedKStream[K, V] =
new BranchedKStream(inner.split(named))
/**
* Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Produced` instance for

View File

@ -27,4 +27,5 @@ package object kstream {
type Joined[K, V, VO] = org.apache.kafka.streams.kstream.Joined[K, V, VO]
type StreamJoined[K, V, VO] = org.apache.kafka.streams.kstream.StreamJoined[K, V, VO]
type Named = org.apache.kafka.streams.kstream.Named
type Branched[K, V] = org.apache.kafka.streams.kstream.Branched[K, V]
}

View File

@ -0,0 +1,126 @@
/*
* Copyright (C) 2018 Joan Goyeau.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.kstream.Named
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.scala.utils.TestDriver
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Matchers}
import org.scalatestplus.junit.JUnitRunner
import scala.jdk.CollectionConverters._
@RunWith(classOf[JUnitRunner])
class KStreamSplitTest extends FlatSpec with Matchers with TestDriver {
"split" should "route messages according to predicates" in {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = Array("default", "even", "three");
val m = builder
.stream[Integer, Integer](sourceTopic)
.split(Named.as("_"))
.branch((_, v) => v % 2 == 0)
.branch((_, v) => v % 3 == 0)
.defaultBranch()
m("_0").to(sinkTopic(0))
m("_1").to(sinkTopic(1))
m("_2").to(sinkTopic(2))
val testDriver = createTestDriver(builder)
val testInput = testDriver.createInput[Integer, Integer](sourceTopic)
val testOutput = sinkTopic.map(name => testDriver.createOutput[Integer, Integer](name))
testInput.pipeValueList(
List(1, 2, 3, 4, 5)
.map(Integer.valueOf)
.asJava
)
testOutput(0).readValuesToList().asScala shouldBe List(1, 5)
testOutput(1).readValuesToList().asScala shouldBe List(2, 4)
testOutput(2).readValuesToList().asScala shouldBe List(3)
testDriver.close()
}
"split" should "route messages to consumers" in {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val m = builder
.stream[Integer, Integer](sourceTopic)
.split(Named.as("_"))
.branch((_, v) => v % 2 == 0, Branched.withConsumer(ks => ks.to("even"), "consumedEvens"))
.branch((_, v) => v % 3 == 0, Branched.withFunction(ks => ks.mapValues(x => x * x), "mapped"))
.noDefaultBranch()
m("_mapped").to("mapped")
val testDriver = createTestDriver(builder)
val testInput = testDriver.createInput[Integer, Integer](sourceTopic)
testInput.pipeValueList(
List(1, 2, 3, 4, 5, 9)
.map(Integer.valueOf)
.asJava
)
val even = testDriver.createOutput[Integer, Integer]("even")
val mapped = testDriver.createOutput[Integer, Integer]("mapped")
even.readValuesToList().asScala shouldBe List(2, 4)
mapped.readValuesToList().asScala shouldBe List(9, 81)
testDriver.close()
}
"split" should "route messages to anonymous consumers" in {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val m = builder
.stream[Integer, Integer](sourceTopic)
.split(Named.as("_"))
.branch((_, v) => v % 2 == 0, Branched.withConsumer(ks => ks.to("even")))
.branch((_, v) => v % 3 == 0, Branched.withFunction(ks => ks.mapValues(x => x * x)))
.noDefaultBranch()
m("_2").to("mapped")
val testDriver = createTestDriver(builder)
val testInput = testDriver.createInput[Integer, Integer](sourceTopic)
testInput.pipeValueList(
List(1, 2, 3, 4, 5, 9)
.map(Integer.valueOf)
.asJava
)
val even = testDriver.createOutput[Integer, Integer]("even")
val mapped = testDriver.createOutput[Integer, Integer]("mapped")
even.readValuesToList().asScala shouldBe List(2, 4)
mapped.readValuesToList().asScala shouldBe List(9, 81)
testDriver.close()
}
}