KAFKA-18026: KIP-1112 convert StreamToTableNode (#18149)

Covers wrapping of processors and state stores for StreamToTableSource

Reviewers: Guozhang Wang <guozhang.wang.us@gmail.com>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
Almog Gavra 2024-12-12 14:52:21 -08:00 committed by GitHub
parent 3b1bd3812e
commit 9b776ffc50
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 7 additions and 31 deletions

View File

@ -659,8 +659,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final ProcessorParameters<K, V, ?, ?> processorParameters = new ProcessorParameters<>(tableSource, name); final ProcessorParameters<K, V, ?, ?> processorParameters = new ProcessorParameters<>(tableSource, name);
final GraphNode tableNode = new StreamToTableNode<>( final GraphNode tableNode = new StreamToTableNode<>(
name, name,
processorParameters, processorParameters
materializedInternal
); );
tableNode.setOutputVersioned(materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier); tableNode.setOutputVersioned(materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier);

View File

@ -17,13 +17,7 @@
package org.apache.kafka.streams.kstream.internals.graph; package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.KeyValueStore;
/** /**
* Represents a KTable convert From KStream * Represents a KTable convert From KStream
@ -31,37 +25,20 @@ import org.apache.kafka.streams.state.KeyValueStore;
public class StreamToTableNode<K, V> extends GraphNode { public class StreamToTableNode<K, V> extends GraphNode {
private final ProcessorParameters<K, V, ?, ?> processorParameters; private final ProcessorParameters<K, V, ?, ?> processorParameters;
private final MaterializedInternal<K, V, ?> materializedInternal;
public StreamToTableNode(final String nodeName, public StreamToTableNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?> processorParameters, final ProcessorParameters<K, V, ?, ?> processorParameters) {
final MaterializedInternal<K, V, ?> materializedInternal) {
super(nodeName); super(nodeName);
this.processorParameters = processorParameters; this.processorParameters = processorParameters;
this.materializedInternal = materializedInternal;
} }
@Override @Override
public String toString() { public String toString() {
return "StreamToTableNode{" + return "StreamToTableNode{" + super.toString() + "}";
", processorParameters=" + processorParameters +
", materializedInternal=" + materializedInternal +
"} " + super.toString();
} }
@SuppressWarnings("unchecked")
@Override @Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
final StoreFactory storeFactory = processorParameters.addProcessorTo(topologyBuilder, parentNodeNames());
new KeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal);
final String processorName = processorParameters.processorName();
final KTableSource<K, V> tableSource = processorParameters.processorSupplier() instanceof KTableSource ?
(KTableSource<K, V>) processorParameters.processorSupplier() : null;
topologyBuilder.addProcessor(processorName, processorParameters.processorSupplier(), parentNodeNames());
if (storeFactory != null && tableSource.materialized()) {
topologyBuilder.addStateStore(storeFactory, processorName);
}
} }
} }

View File

@ -1686,16 +1686,16 @@ public class StreamsBuilderTest {
.selectKey((k, v) -> k, Named.as("selectKey")) // wrapped 3 .selectKey((k, v) -> k, Named.as("selectKey")) // wrapped 3
.peek((k, v) -> { }, Named.as("peek")) // wrapped 4 .peek((k, v) -> { }, Named.as("peek")) // wrapped 4
.flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) // wrapped 5 .flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) // wrapped 5
.toTable(Named.as("toTable")) // should be wrapped when we do StreamToTableNode .toTable(Named.as("toTable")) // wrapped 6
.filter((k, v) -> true, Named.as("filter-table")) // should be wrapped once we do TableProcessorNode .filter((k, v) -> true, Named.as("filter-table")) // should be wrapped once we do TableProcessorNode
.toStream(Named.as("toStream")) // wrapped 7 .toStream(Named.as("toStream")) // wrapped 7
.to("output", Produced.as("sink")); .to("output", Produced.as("sink"));
builder.build(); builder.build();
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(7)); assertThat(counter.numWrappedProcessors(), CoreMatchers.is(8));
assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder( assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
"filter-stream", "map", "selectKey", "peek", "flatMap", "filter-stream", "map", "selectKey", "peek", "flatMap",
"toTable-repartition-filter", "toStream" "toTable-repartition-filter", "toStream", "toTable"
)); ));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(0)); assertThat(counter.numUniqueStateStores(), CoreMatchers.is(0));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(0)); assertThat(counter.numConnectedStateStores(), CoreMatchers.is(0));