KAFKA-18026: KIP-1112, clean up StatefulProcessorNode (#18195)

Final cleanup of StatefulProcessorNode after converting all stateful operators to adding state stores via implementing the #stores method.
This commit is contained in:
A. Sophie Blee-Goldman 2025-01-14 17:42:58 -08:00 committed by GitHub
parent 118e1835cc
commit faef80a2e2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 249 additions and 382 deletions

View File

@ -30,8 +30,6 @@ import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import java.util.ArrayList;
@ -61,24 +59,25 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
processRepartitions(groupPatterns, storeFactory.storeName());
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
boolean stateCreated = false;
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
new KStreamAggregate<>(storeFactory, initializer, kGroupedStream.getValue());
parentProcessors.add(parentProcessor);
final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME),
stateCreated,
storeFactory,
parentProcessor);
statefulProcessorNode.setOutputVersioned(isOutputVersioned);
stateCreated = true;
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
final String kStreamAggProcessorName = named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME);
final ProcessorGraphNode<K, ?> aggProcessorNode =
new ProcessorGraphNode<>(
kStreamAggProcessorName,
new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName)
);
aggProcessorNode.setOutputVersioned(isOutputVersioned);
processors.add(aggProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode);
}
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName());
}
@ -96,7 +95,6 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
boolean stateCreated = false;
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
@ -107,17 +105,18 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
initializer,
kGroupedStream.getValue());
parentProcessors.add(parentProcessor);
final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME),
stateCreated,
storeFactory,
parentProcessor);
stateCreated = true;
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
final String kStreamAggProcessorName = named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME);
final ProcessorGraphNode<K, ?> aggProcessorNode =
new ProcessorGraphNode<>(
kStreamAggProcessorName,
new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName)
);
processors.add(aggProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode);
}
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName());
}
@ -135,7 +134,6 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
processRepartitions(groupPatterns, storeFactory.storeName());
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
boolean stateCreated = false;
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
@ -147,17 +145,17 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
kGroupedStream.getValue(),
sessionMerger);
parentProcessors.add(parentProcessor);
final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME),
stateCreated,
storeFactory,
parentProcessor);
stateCreated = true;
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
final String kStreamAggProcessorName = named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME);
final ProcessorGraphNode<K, ?> aggProcessorNode =
new ProcessorGraphNode<>(
kStreamAggProcessorName,
new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName)
);
processors.add(aggProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode);
}
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName());
}
@ -174,7 +172,6 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
processRepartitions(groupPatterns, storeFactory.storeName());
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
final Collection<GraphNode> processors = new ArrayList<>();
boolean stateCreated = false;
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
@ -186,17 +183,17 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
initializer,
kGroupedStream.getValue());
parentProcessors.add(parentProcessor);
final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME),
stateCreated,
storeFactory,
parentProcessor);
stateCreated = true;
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
final String kStreamAggProcessorName = named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME);
final ProcessorGraphNode<K, ?> aggProcessorNode =
new ProcessorGraphNode<>(
kStreamAggProcessorName,
new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName)
);
processors.add(aggProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode);
}
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName());
}
@ -262,30 +259,6 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
builder);
}
private StatefulProcessorNode<K, ?> getStatefulProcessorNode(final String processorName,
final boolean stateCreated,
final StoreFactory storeFactory,
final ProcessorSupplier<K, ?, K, ?> kStreamAggregate) {
final StatefulProcessorNode<K, ?> statefulProcessorNode;
if (!stateCreated) {
statefulProcessorNode =
new StatefulProcessorNode<>(
processorName,
new ProcessorParameters<>(kStreamAggregate, processorName),
storeFactory
);
} else {
statefulProcessorNode =
new StatefulProcessorNode<>(
processorName,
new ProcessorParameters<>(kStreamAggregate, processorName),
new String[]{storeFactory.storeName()}
);
}
return statefulProcessorNode;
}
@SuppressWarnings("unchecked")
private <VIn> void createRepartitionSource(final String repartitionTopicNamePrefix,
final OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder,

View File

@ -21,8 +21,8 @@ import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import java.util.Collections;
@ -97,11 +97,10 @@ class GroupedStreamAggregateBuilder<K, V> {
parentNode = repartitionNode;
}
final StatefulProcessorNode<K, V> statefulProcessorNode =
new StatefulProcessorNode<>(
final ProcessorGraphNode<K, V> statefulProcessorNode =
new ProcessorGraphNode<>(
aggFunctionName,
new ProcessorParameters<>(aggregateSupplier, aggFunctionName),
new String[] {storeFactory.storeName()}
new ProcessorParameters<>(aggregateSupplier, aggFunctionName)
);
statefulProcessorNode.setOutputVersioned(isOutputVersioned);

View File

@ -27,8 +27,8 @@ import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.GroupedTableOperationRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
@ -88,10 +88,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
// the passed in StreamsGraphNode must be the parent of the repartition node
builder.addGraphNode(this.graphNode, repartitionGraphNode);
final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>(
final ProcessorGraphNode statefulProcessorNode = new ProcessorGraphNode<>(
funcName,
new ProcessorParameters<>(aggregateSupplier, funcName),
new String[]{materialized.storeName()}
new ProcessorParameters<>(aggregateSupplier, funcName)
);
statefulProcessorNode.setOutputVersioned(materialized.storeSupplier() instanceof VersionedBytesStoreSupplier);

View File

@ -48,7 +48,7 @@ import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNo
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorToStateConnectorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamToTableNode;
@ -1225,7 +1225,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
final String name = new NamedInternal(named).name();
final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>(
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new
ProcessorToStateConnectorNode<>(
name,
new ProcessorParameters<>(processorSupplier, name),
stateStoreNames);
@ -1270,7 +1271,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
final String name = new NamedInternal(named).name();
final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>(
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>(
name,
new ProcessorParameters<>(processorSupplier, name),
stateStoreNames);

View File

@ -54,11 +54,10 @@ import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorToStateConnectorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode;
import org.apache.kafka.streams.kstream.internals.graph.TableFilterNode;
import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.TableRepartitionMapNode;
import org.apache.kafka.streams.kstream.internals.graph.TableSuppressNode;
import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
@ -69,7 +68,9 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@ -312,7 +313,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final ProcessorParameters<K, VR, ?, ?> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(
new ProcessorParameters<>(processorSupplier, name)
);
final GraphNode tableNode = new TableProcessorNode<>(
final GraphNode tableNode = new ProcessorGraphNode<>(
name,
processorParameters
);
@ -439,7 +440,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final Serde<K> keySerde;
final Serde<VR> valueSerde;
final String queryableStoreName;
final StoreFactory storeFactory;
final Set<StoreBuilder<?>> storeBuilder;
if (materializedInternal != null) {
// don't inherit parent value serde, since this operation may change the value type, more specifically:
@ -449,12 +450,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
valueSerde = materializedInternal.valueSerde();
queryableStoreName = materializedInternal.queryableStoreName();
// only materialize if materialized is specified and it has queryable name
storeFactory = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)) : null;
final StoreFactory storeFactory = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)) : null;
storeBuilder = Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory));
} else {
keySerde = this.keySerde;
valueSerde = null;
queryableStoreName = null;
storeFactory = null;
storeBuilder = null;
}
final String name = namedInternal.orElseGenerateWithPrefix(builder, TRANSFORMVALUES_NAME);
@ -464,14 +466,18 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
transformerSupplier,
queryableStoreName);
final ProcessorParameters<K, VR, ?, ?> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(
new ProcessorParameters<>(processorSupplier, name)
);
final ProcessorParameters<K, VR, ?, ?> processorParameters =
unsafeCastProcessorParametersToCompletelyDifferentType(
new ProcessorParameters<>(
new StoreDelegatingProcessorSupplier<>(
processorSupplier,
storeBuilder),
name
));
final GraphNode tableNode = new TableProcessorNode<>(
final GraphNode tableNode = new ProcessorToStateConnectorNode<>(
name,
processorParameters,
storeFactory,
stateStoreNames
);
maybeSetOutputVersioned(tableNode, materializedInternal);
@ -574,9 +580,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final ProcessorGraphNode<K, Change<V>> node = new TableSuppressNode<>(
name,
new ProcessorParameters<>(suppressionSupplier, name),
new String[]{storeName}
new ProcessorParameters<>(suppressionSupplier, name)
);
node.setOutputVersioned(false);
builder.addGraphNode(graphNode, node);
@ -1235,26 +1241,24 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final String subscriptionReceiveName = renamed.suffixWithOrElseGet(
"-subscription-receive", builder, SUBSCRIPTION_PROCESSOR);
final StatefulProcessorNode<KO, SubscriptionWrapper<K>> subscriptionReceiveNode =
new StatefulProcessorNode<>(
final ProcessorGraphNode<KO, SubscriptionWrapper<K>> subscriptionReceiveNode =
new ProcessorGraphNode<>(
subscriptionReceiveName,
new ProcessorParameters<>(
new SubscriptionReceiveProcessorSupplier<>(subscriptionStoreFactory, combinedKeySchema),
subscriptionReceiveName),
new String[]{subscriptionStoreName}
subscriptionReceiveName)
);
builder.addGraphNode(subscriptionSource, subscriptionReceiveNode);
final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetter = ((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier();
final StatefulProcessorNode<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinNode =
new StatefulProcessorNode<>(
final ProcessorToStateConnectorNode<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinNode =
new ProcessorToStateConnectorNode<>(
new ProcessorParameters<>(
new SubscriptionJoinProcessorSupplier<>(
foreignKeyValueGetter
),
renamed.suffixWithOrElseGet("-subscription-join-foreign", builder, SUBSCRIPTION_PROCESSOR)
),
Collections.emptySet(),
Collections.singleton(foreignKeyValueGetter)
);
builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinNode);
@ -1306,7 +1310,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
builder.internalTopologyBuilder.copartitionSources(resultSourceNodes);
final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
final StatefulProcessorNode<K, SubscriptionResponseWrapper<VO>> responseJoinNode = new StatefulProcessorNode<>(
final ProcessorToStateConnectorNode<K, SubscriptionResponseWrapper<VO>> responseJoinNode = new ProcessorToStateConnectorNode<>(
new ProcessorParameters<>(
new ResponseJoinProcessorSupplier<>(
primaryKeyValueGetter,
@ -1317,7 +1321,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
),
renamed.suffixWithOrElseGet("-subscription-response-resolver", builder, SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR)
),
Collections.emptySet(),
Collections.singleton(primaryKeyValueGetter)
);
builder.addGraphNode(foreignResponseSource, responseJoinNode);
@ -1339,7 +1342,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final KTableSource<K, VR> resultProcessorSupplier = new KTableSource<>(materializedInternal);
final TableProcessorNode<K, VR> resultNode = new TableProcessorNode<>(
final ProcessorGraphNode<K, VR> resultNode = new ProcessorGraphNode<>(
resultProcessorName,
new ProcessorParameters<>(
resultProcessorSupplier,

View File

@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.api.ProcessorSupplier;
public class ForeignJoinSubscriptionSendNode<K, V> extends ProcessorGraphNode<K, V> implements VersionedSemanticsGraphNode {
public ForeignJoinSubscriptionSendNode(final ProcessorParameters<K, V, ?, ?> processorParameters) {
super(processorParameters);
super(processorParameters.processorName(), processorParameters);
}
@SuppressWarnings("unchecked")

View File

@ -72,8 +72,8 @@ public final class GraphGraceSearchUtil {
@SuppressWarnings("rawtypes")
private static Long extractGracePeriod(final GraphNode node) {
if (node instanceof StatefulProcessorNode) {
final ProcessorSupplier processorSupplier = ((StatefulProcessorNode) node).processorParameters().processorSupplier();
if (node instanceof ProcessorGraphNode) {
final ProcessorSupplier processorSupplier = ((ProcessorGraphNode) node).processorParameters().processorSupplier();
if (processorSupplier instanceof KStreamWindowAggregate) {
final KStreamWindowAggregate kStreamWindowAggregate = (KStreamWindowAggregate) processorSupplier;
final Windows windows = kStreamWindowAggregate.windows();

View File

@ -28,13 +28,6 @@ public class ProcessorGraphNode<K, V> extends GraphNode {
private final ProcessorParameters<K, V, ?, ?> processorParameters;
public ProcessorGraphNode(final ProcessorParameters<K, V, ?, ?> processorParameters) {
super(processorParameters.processorName());
this.processorParameters = processorParameters;
}
public ProcessorGraphNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?> processorParameters) {

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.ConnectedStoreProvider;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import java.util.Arrays;
import java.util.Set;
/**
* Used for stateful processors that need to be manually connected to the state store(s)
* they need to access. This should only be used in cases where the stores) cannot
* be connected automatically by implementing the {@link ConnectedStoreProvider#stores()} method
* and returning the store directly. Generally this will only apply to DSL operators that utilize
* value getters to access another processor's state store(s), and the process/processValues
* operator where the user's custom processor supplier doesn't implement the #stores method
* and has to be connected to the store when compiling the topology.
*/
public class ProcessorToStateConnectorNode<K, V> extends ProcessorGraphNode<K, V> {
private final String[] storeNames;
/**
* Create a node representing a stateful processor that uses value getters to access stores, and needs to
* be connected with those stores
*/
public ProcessorToStateConnectorNode(final ProcessorParameters<K, V, ?, ?> processorParameters,
final Set<KTableValueGetterSupplier<?, ?>> valueGetterSuppliers) {
super(processorParameters.processorName(), processorParameters);
storeNames = valueGetterSuppliers.stream().flatMap(s -> Arrays.stream(s.storeNames())).toArray(String[]::new);
}
/**
* Create a node representing a stateful processor, which needs to be connected to the provided stores
*/
public ProcessorToStateConnectorNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?> processorParameters,
final String[] storeNames) {
super(nodeName, processorParameters);
this.storeNames = storeNames;
}
@Override
public String toString() {
return "ProcessorNode{" +
"storeNames=" + Arrays.toString(storeNames) +
"} " + super.toString();
}
@Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
processorParameters().addProcessorTo(topologyBuilder, parentNodeNames());
if (storeNames != null && storeNames.length > 0) {
topologyBuilder.connectProcessorAndStateStores(processorParameters().processorName(), storeNames);
}
}
}

View File

@ -1,92 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Stream;
public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
private final String[] storeNames;
private final StoreFactory storeFactory;
/**
* Create a node representing a stateful processor, where the named stores have already been registered.
*/
public StatefulProcessorNode(final ProcessorParameters<K, V, ?, ?> processorParameters,
final Set<StoreBuilder<?>> preRegisteredStores,
final Set<KTableValueGetterSupplier<?, ?>> valueGetterSuppliers) {
super(processorParameters.processorName(), processorParameters);
final Stream<String> registeredStoreNames = preRegisteredStores.stream().map(StoreBuilder::name);
final Stream<String> valueGetterStoreNames = valueGetterSuppliers.stream().flatMap(s -> Arrays.stream(s.storeNames()));
storeNames = Stream.concat(registeredStoreNames, valueGetterStoreNames).toArray(String[]::new);
storeFactory = null;
}
/**
* Create a node representing a stateful processor, where the named stores have already been registered.
*/
public StatefulProcessorNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?> processorParameters,
final String[] storeNames) {
super(nodeName, processorParameters);
this.storeNames = storeNames;
this.storeFactory = null;
}
/**
* Create a node representing a stateful processor,
* where the store needs to be built and registered as part of building this node.
*/
public StatefulProcessorNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?> processorParameters,
final StoreFactory materializedKTableStoreBuilder) {
super(nodeName, processorParameters);
this.storeNames = null;
this.storeFactory = materializedKTableStoreBuilder;
}
@Override
public String toString() {
return "StatefulProcessorNode{" +
"storeNames=" + Arrays.toString(storeNames) +
", storeBuilder=" + storeFactory +
"} " + super.toString();
}
@Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
processorParameters().addProcessorTo(topologyBuilder, parentNodeNames());
if (storeNames != null && storeNames.length > 0) {
topologyBuilder.connectProcessorAndStateStores(processorParameters().processorName(), storeNames);
}
if (storeFactory != null) {
topologyBuilder.addStateStore(storeFactory, processorParameters().processorName());
}
}
}

View File

@ -20,11 +20,11 @@ package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.kstream.internals.KTableFilter;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
public class TableFilterNode<K, V> extends TableProcessorNode<K, V> implements VersionedSemanticsGraphNode {
public class TableFilterNode<K, V> extends ProcessorGraphNode<K, V> implements VersionedSemanticsGraphNode {
public TableFilterNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?> processorParameters) {
super(nodeName, processorParameters, null, null);
super(nodeName, processorParameters);
}
@SuppressWarnings("unchecked")

View File

@ -1,76 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import java.util.Arrays;
public class TableProcessorNode<K, V> extends GraphNode {
private final ProcessorParameters<K, V, ?, ?> processorParameters;
private final StoreFactory storeFactory;
private final String[] storeNames;
public TableProcessorNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?> processorParameters) {
this(nodeName, processorParameters, null, null);
}
public TableProcessorNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?> processorParameters,
final StoreFactory storeFactory,
final String[] storeNames) {
super(nodeName);
this.processorParameters = processorParameters;
this.storeFactory = storeFactory;
this.storeNames = storeNames != null ? storeNames : new String[] {};
}
public ProcessorParameters<K, V, ?, ?> processorParameters() {
return processorParameters;
}
@Override
public String toString() {
return "TableProcessorNode{" +
", processorParameters=" + processorParameters +
", storeFactory=" + (storeFactory == null ? "null" : storeFactory.storeName()) +
", storeNames=" + Arrays.toString(storeNames) +
"} " + super.toString();
}
@SuppressWarnings("unchecked")
@Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
processorParameters.addProcessorTo(topologyBuilder, parentNodeNames());
final String processorName = processorParameters.processorName();
if (storeNames.length > 0) {
// todo(rodesai): remove me once all operators have been moved to ProcessorSupplier
topologyBuilder.connectProcessorAndStateStores(processorName, storeNames);
}
if (storeFactory != null) {
// todo(rodesai) remove when KTableImpl#doFilter, KTableImpl#doTransformValues moved to ProcessorSupplier
topologyBuilder.addStateStore(storeFactory, processorName);
}
}
}

View File

@ -16,10 +16,12 @@
*/
package org.apache.kafka.streams.kstream.internals.graph;
public class TableSuppressNode<K, V> extends StatefulProcessorNode<K, V> {
/**
* Marker interface to identify suppression nodes since they have some special requirements
*/
public class TableSuppressNode<K, V> extends ProcessorGraphNode<K, V> {
public TableSuppressNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?> processorParameters,
final String[] storeNames) {
super(nodeName, processorParameters, storeNames);
final ProcessorParameters<K, V, ?, ?> processorParameters) {
super(nodeName, processorParameters);
}
}

View File

@ -25,7 +25,6 @@ import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.junit.jupiter.api.Test;
@ -50,8 +49,20 @@ public class GraphGraceSearchUtilTest {
public void shouldFailIfThereIsNoGraceAncestor() {
// doesn't matter if this ancestor is stateless or stateful. The important thing it that there is
// no grace period defined on any ancestor of the node
final StatefulProcessorNode<String, Long> gracelessAncestor = new StatefulProcessorNode<>(
"stateful",
final ProcessorGraphNode<String, Long> gracelessAncestor = new ProcessorGraphNode<>(
"graceless",
new ProcessorParameters<>(
() -> new Processor<String, Long, String, Long>() {
@Override
public void process(final Record<String, Long> record) {}
},
"graceless"
)
);
final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>(
"stateless",
new ProcessorParameters<>(
() -> new Processor<String, Long, String, Long>() {
@ -59,26 +70,24 @@ public class GraphGraceSearchUtilTest {
public void process(final Record<String, Long> record) {}
},
"dummy"
),
(StoreFactory) null
"stateless"
)
);
final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null);
gracelessAncestor.addChild(node);
try {
GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
fail("should have thrown.");
} catch (final TopologyException e) {
assertThat(e.getMessage(), is("Invalid topology: Window close time is only defined for windowed computations. Got [stateful->stateless]."));
assertThat(e.getMessage(), is("Invalid topology: Window close time is only defined for windowed computations. Got [graceless->stateless]."));
}
}
@Test
public void shouldExtractGraceFromKStreamWindowAggregateNode() {
final TimeWindows windows = TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(1234L));
final StatefulProcessorNode<String, Long> node = new StatefulProcessorNode<>(
final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>(
"asdf",
new ProcessorParameters<>(
new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
@ -89,8 +98,7 @@ public class GraphGraceSearchUtilTest {
null
),
"asdf"
),
(StoreFactory) null
)
);
final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
@ -101,7 +109,7 @@ public class GraphGraceSearchUtilTest {
public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() {
final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L));
final StatefulProcessorNode<String, Long> node = new StatefulProcessorNode<>(
final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>(
"asdf",
new ProcessorParameters<>(
new KStreamSessionWindowAggregate<String, Long, Integer>(
@ -113,8 +121,7 @@ public class GraphGraceSearchUtilTest {
null
),
"asdf"
),
(StoreFactory) null
)
);
final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
@ -124,15 +131,14 @@ public class GraphGraceSearchUtilTest {
@Test
public void shouldExtractGraceFromSessionAncestorThroughStatefulParent() {
final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L));
final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
final ProcessorGraphNode<String, Long> graceGrandparent = new ProcessorGraphNode<>(
"asdf",
new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>(
windows, mockStoreFactory("asdf"), EmitStrategy.onWindowUpdate(), null, null, null
), "asdf"),
(StoreFactory) null
), "asdf")
);
final StatefulProcessorNode<String, Long> statefulParent = new StatefulProcessorNode<>(
final ProcessorGraphNode<String, Long> statefulParent = new ProcessorGraphNode<>(
"stateful",
new ProcessorParameters<>(
() -> new Processor<String, Long, String, Long>() {
@ -142,12 +148,22 @@ public class GraphGraceSearchUtilTest {
},
"dummy"
),
(StoreFactory) null
)
);
graceGrandparent.addChild(statefulParent);
final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null);
final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>(
"stateless",
new ProcessorParameters<>(
() -> new Processor<String, Long, String, Long>() {
@Override
public void process(final Record<String, Long> record) {}
},
"dummyChild-graceless"
)
);
statefulParent.addChild(node);
final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
@ -157,7 +173,7 @@ public class GraphGraceSearchUtilTest {
@Test
public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() {
final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L));
final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
final ProcessorGraphNode<String, Long> graceGrandparent = new ProcessorGraphNode<>(
"asdf",
new ProcessorParameters<>(
new KStreamSessionWindowAggregate<String, Long, Integer>(
@ -169,14 +185,35 @@ public class GraphGraceSearchUtilTest {
null
),
"asdf"
),
(StoreFactory) null
)
);
final ProcessorGraphNode<String, Long> statelessParent = new ProcessorGraphNode<>("stateless", null);
final ProcessorGraphNode<String, Long> statelessParent = new ProcessorGraphNode<>(
"statelessParent",
new ProcessorParameters<>(
() -> new Processor<String, Long, String, Long>() {
@Override
public void process(final Record<String, Long> record) {}
},
"statelessParent"
)
);
graceGrandparent.addChild(statelessParent);
final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null);
final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>(
"stateless",
new ProcessorParameters<>(
() -> new Processor<String, Long, String, Long>() {
@Override
public void process(final Record<String, Long> record) {}
},
"stateless"
)
);
statelessParent.addChild(node);
final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
@ -185,7 +222,7 @@ public class GraphGraceSearchUtilTest {
@Test
public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() {
final StatefulProcessorNode<String, Long> leftParent = new StatefulProcessorNode<>(
final ProcessorGraphNode<String, Long> leftParent = new ProcessorGraphNode<>(
"asdf",
new ProcessorParameters<>(
new KStreamSessionWindowAggregate<String, Long, Integer>(
@ -197,11 +234,10 @@ public class GraphGraceSearchUtilTest {
null
),
"asdf"
),
(StoreFactory) null
)
);
final StatefulProcessorNode<String, Long> rightParent = new StatefulProcessorNode<>(
final ProcessorGraphNode<String, Long> rightParent = new ProcessorGraphNode<>(
"asdf",
new ProcessorParameters<>(
new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
@ -212,11 +248,21 @@ public class GraphGraceSearchUtilTest {
null
),
"asdf"
),
(StoreFactory) null
)
);
final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null);
final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>(
"stateless",
new ProcessorParameters<>(
() -> new Processor<String, Long, String, Long>() {
@Override
public void process(final Record<String, Long> record) {}
},
"stateless"
)
);
leftParent.addChild(node);
rightParent.addChild(node);

View File

@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TableProcessorNodeTest {
private static class TestProcessor implements Processor<String, String, String, String> {
@Override
public void process(final Record<String, String> record) {
}
}
@Test
public void shouldConvertToStringWithNullStoreBuilder() {
final TableProcessorNode<String, String> node = new TableProcessorNode<>(
"name",
new ProcessorParameters<>(TestProcessor::new, "processor"),
null,
new String[]{"store1", "store2"}
);
final String asString = node.toString();
final String expected = "storeFactory=null";
assertTrue(
asString.contains(expected),
String.format(
"Expected toString to return string with \"%s\", received: %s",
expected,
asString)
);
}
}