mirror of https://github.com/apache/kafka.git
KAFKA-12824 Delete unused doBranch method (#16981)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
a95dfe215b
commit
3d436f52bf
|
|
@ -68,7 +68,6 @@ import org.apache.kafka.streams.state.KeyValueStore;
|
|||
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
|
||||
import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBuffer;
|
||||
|
||||
import java.lang.reflect.Array;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
|
|
@ -104,10 +103,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
|
|||
|
||||
static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
|
||||
|
||||
private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
|
||||
|
||||
private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
|
||||
|
||||
private static final String FILTER_NAME = "KSTREAM-FILTER-";
|
||||
|
||||
private static final String PEEK_NAME = "KSTREAM-PEEK-";
|
||||
|
|
@ -450,45 +445,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
|
|||
builder);
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
private KStream<K, V>[] doBranch(final NamedInternal named,
|
||||
final Predicate<? super K, ? super V>... predicates) {
|
||||
Objects.requireNonNull(predicates, "predicates can't be a null array");
|
||||
if (predicates.length == 0) {
|
||||
throw new IllegalArgumentException("branch() requires at least one predicate");
|
||||
}
|
||||
for (final Predicate<? super K, ? super V> predicate : predicates) {
|
||||
Objects.requireNonNull(predicate, "predicates can't be null");
|
||||
}
|
||||
|
||||
final String branchName = named.orElseGenerateWithPrefix(builder, BRANCH_NAME);
|
||||
final String[] childNames = new String[predicates.length];
|
||||
for (int i = 0; i < predicates.length; i++) {
|
||||
childNames[i] = named.suffixWithOrElseGet("-predicate-" + i, builder, BRANCHCHILD_NAME);
|
||||
}
|
||||
|
||||
final ProcessorParameters processorParameters =
|
||||
new ProcessorParameters<>(new KStreamBranch(Arrays.asList(predicates.clone()),
|
||||
Arrays.asList(childNames)), branchName);
|
||||
final ProcessorGraphNode<K, V> branchNode =
|
||||
new ProcessorGraphNode<>(branchName, processorParameters);
|
||||
|
||||
builder.addGraphNode(graphNode, branchNode);
|
||||
|
||||
final KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
|
||||
for (int i = 0; i < predicates.length; i++) {
|
||||
final ProcessorParameters innerProcessorParameters =
|
||||
new ProcessorParameters<>(new PassThrough<K, V>(), childNames[i]);
|
||||
final ProcessorGraphNode<K, V> branchChildNode =
|
||||
new ProcessorGraphNode<>(childNames[i], innerProcessorParameters);
|
||||
|
||||
builder.addGraphNode(branchNode, branchChildNode);
|
||||
branchChildren[i] = new KStreamImpl<>(childNames[i], keySerde, valueSerde, subTopologySourceNodes, repartitionRequired, branchChildNode, builder);
|
||||
}
|
||||
|
||||
return branchChildren;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BranchedKStream<K, V> split() {
|
||||
return new BranchedKStreamImpl<>(this, repartitionRequired, NamedInternal.empty());
|
||||
|
|
|
|||
Loading…
Reference in New Issue