From 3d436f52bf7e3309947e01e939233071d29c6284 Mon Sep 17 00:00:00 2001 From: TengYao Chi Date: Sun, 25 Aug 2024 14:24:54 +0800 Subject: [PATCH] KAFKA-12824 Delete unused doBranch method (#16981) Reviewers: Chia-Ping Tsai --- .../kstream/internals/KStreamImpl.java | 44 ------------------- 1 file changed, 44 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 556ff94c95a..324915f4c75 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -68,7 +68,6 @@ import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBuffer; -import java.lang.reflect.Array; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -104,10 +103,6 @@ public class KStreamImpl extends AbstractStream implements KStream extends AbstractStream implements KStream[] doBranch(final NamedInternal named, - final Predicate... predicates) { - Objects.requireNonNull(predicates, "predicates can't be a null array"); - if (predicates.length == 0) { - throw new IllegalArgumentException("branch() requires at least one predicate"); - } - for (final Predicate predicate : predicates) { - Objects.requireNonNull(predicate, "predicates can't be null"); - } - - final String branchName = named.orElseGenerateWithPrefix(builder, BRANCH_NAME); - final String[] childNames = new String[predicates.length]; - for (int i = 0; i < predicates.length; i++) { - childNames[i] = named.suffixWithOrElseGet("-predicate-" + i, builder, BRANCHCHILD_NAME); - } - - final ProcessorParameters processorParameters = - new ProcessorParameters<>(new KStreamBranch(Arrays.asList(predicates.clone()), - Arrays.asList(childNames)), branchName); - final ProcessorGraphNode branchNode = - new ProcessorGraphNode<>(branchName, processorParameters); - - builder.addGraphNode(graphNode, branchNode); - - final KStream[] branchChildren = (KStream[]) Array.newInstance(KStream.class, predicates.length); - for (int i = 0; i < predicates.length; i++) { - final ProcessorParameters innerProcessorParameters = - new ProcessorParameters<>(new PassThrough(), childNames[i]); - final ProcessorGraphNode branchChildNode = - new ProcessorGraphNode<>(childNames[i], innerProcessorParameters); - - builder.addGraphNode(branchNode, branchChildNode); - branchChildren[i] = new KStreamImpl<>(childNames[i], keySerde, valueSerde, subTopologySourceNodes, repartitionRequired, branchChildNode, builder); - } - - return branchChildren; - } - @Override public BranchedKStream split() { return new BranchedKStreamImpl<>(this, repartitionRequired, NamedInternal.empty());