diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 92cfdb04425..0ba9086e450 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -659,8 +659,7 @@ public class KStreamImpl extends AbstractStream implements KStream processorParameters = new ProcessorParameters<>(tableSource, name); final GraphNode tableNode = new StreamToTableNode<>( name, - processorParameters, - materializedInternal + processorParameters ); tableNode.setOutputVersioned(materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java index a6c825be0c8..08c171e824a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java @@ -17,13 +17,7 @@ package org.apache.kafka.streams.kstream.internals.graph; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.internals.KTableSource; -import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer; -import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.apache.kafka.streams.processor.internals.StoreFactory; -import org.apache.kafka.streams.state.KeyValueStore; /** * Represents a KTable convert From KStream @@ -31,37 +25,20 @@ import org.apache.kafka.streams.state.KeyValueStore; public class StreamToTableNode extends GraphNode { private final ProcessorParameters processorParameters; - private final MaterializedInternal materializedInternal; public StreamToTableNode(final String nodeName, - final ProcessorParameters processorParameters, - final MaterializedInternal materializedInternal) { + final ProcessorParameters processorParameters) { super(nodeName); this.processorParameters = processorParameters; - this.materializedInternal = materializedInternal; } @Override public String toString() { - return "StreamToTableNode{" + - ", processorParameters=" + processorParameters + - ", materializedInternal=" + materializedInternal + - "} " + super.toString(); + return "StreamToTableNode{" + super.toString() + "}"; } - @SuppressWarnings("unchecked") @Override public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { - final StoreFactory storeFactory = - new KeyValueStoreMaterializer<>((MaterializedInternal>) materializedInternal); - - final String processorName = processorParameters.processorName(); - final KTableSource tableSource = processorParameters.processorSupplier() instanceof KTableSource ? - (KTableSource) processorParameters.processorSupplier() : null; - topologyBuilder.addProcessor(processorName, processorParameters.processorSupplier(), parentNodeNames()); - - if (storeFactory != null && tableSource.materialized()) { - topologyBuilder.addStateStore(storeFactory, processorName); - } + processorParameters.addProcessorTo(topologyBuilder, parentNodeNames()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index 5371d53949f..00c745fd735 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -1686,16 +1686,16 @@ public class StreamsBuilderTest { .selectKey((k, v) -> k, Named.as("selectKey")) // wrapped 3 .peek((k, v) -> { }, Named.as("peek")) // wrapped 4 .flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) // wrapped 5 - .toTable(Named.as("toTable")) // should be wrapped when we do StreamToTableNode + .toTable(Named.as("toTable")) // wrapped 6 .filter((k, v) -> true, Named.as("filter-table")) // should be wrapped once we do TableProcessorNode .toStream(Named.as("toStream")) // wrapped 7 .to("output", Produced.as("sink")); builder.build(); - assertThat(counter.numWrappedProcessors(), CoreMatchers.is(7)); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(8)); assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder( "filter-stream", "map", "selectKey", "peek", "flatMap", - "toTable-repartition-filter", "toStream" + "toTable-repartition-filter", "toStream", "toTable" )); assertThat(counter.numUniqueStateStores(), CoreMatchers.is(0)); assertThat(counter.numConnectedStateStores(), CoreMatchers.is(0));