From 99b8b51f1ecc7fb92f3d7c48709b20133cf15bb2 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 24 Apr 2020 19:18:00 -0700 Subject: [PATCH] MINOR: Remove unused foreign-key join class (#8547) Reviewers: John Roesler --- ...bleKTableForeignKeyJoinResolutionNode.java | 82 ------------------- 1 file changed, 82 deletions(-) delete mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableForeignKeyJoinResolutionNode.java diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableForeignKeyJoinResolutionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableForeignKeyJoinResolutionNode.java deleted file mode 100644 index 90fc177c06e..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableForeignKeyJoinResolutionNode.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals.graph; - -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.kstream.internals.Change; -import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier; -import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapper; -import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper; -import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; -import org.apache.kafka.streams.processor.internals.InternalTopicProperties; -import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; - -/** - * Too much specific information to generalize so the Foreign Key KTable-KTable join requires a specific node. - */ -public class KTableKTableForeignKeyJoinResolutionNode extends StreamsGraphNode { - private final ProcessorParameters> joinOneToOneProcessorParameters; - private final ProcessorParameters> joinByPrefixProcessorParameters; - private final ProcessorParameters> resolverProcessorParameters; - private final String finalRepartitionTopicName; - private final String finalRepartitionSinkName; - private final String finalRepartitionSourceName; - private final Serde keySerde; - private final Serde> subResponseSerde; - private final KTableValueGetterSupplier originalValueGetter; - - public KTableKTableForeignKeyJoinResolutionNode(final String nodeName, - final ProcessorParameters> joinOneToOneProcessorParameters, - final ProcessorParameters> joinByPrefixProcessorParameters, - final ProcessorParameters> resolverProcessorParameters, - final String finalRepartitionTopicName, - final String finalRepartitionSinkName, - final String finalRepartitionSourceName, - final Serde keySerde, - final Serde> subResponseSerde, - final KTableValueGetterSupplier originalValueGetter - ) { - super(nodeName); - this.joinOneToOneProcessorParameters = joinOneToOneProcessorParameters; - this.joinByPrefixProcessorParameters = joinByPrefixProcessorParameters; - this.resolverProcessorParameters = resolverProcessorParameters; - this.finalRepartitionTopicName = finalRepartitionTopicName; - this.finalRepartitionSinkName = finalRepartitionSinkName; - this.finalRepartitionSourceName = finalRepartitionSourceName; - this.keySerde = keySerde; - this.subResponseSerde = subResponseSerde; - this.originalValueGetter = originalValueGetter; - } - - @Override - public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { - topologyBuilder.addInternalTopic(finalRepartitionTopicName, InternalTopicProperties.empty()); - //Repartition back to the original partitioning structure - topologyBuilder.addSink(finalRepartitionSinkName, finalRepartitionTopicName, - keySerde.serializer(), subResponseSerde.serializer(), - null, - joinByPrefixProcessorParameters.processorName(), joinOneToOneProcessorParameters.processorName()); - - topologyBuilder.addSource(null, finalRepartitionSourceName, new FailOnInvalidTimestamp(), - keySerde.deserializer(), subResponseSerde.deserializer(), finalRepartitionTopicName); - - //Connect highwaterProcessor to source, add the state store, and connect the statestore with the processor. - topologyBuilder.addProcessor(resolverProcessorParameters.processorName(), resolverProcessorParameters.processorSupplier(), finalRepartitionSourceName); - topologyBuilder.connectProcessorAndStateStores(resolverProcessorParameters.processorName(), originalValueGetter.storeNames()); - } -}