KAFKA-18026: KIP-1112, skip re-registering aggregate stores in StatefulProcessorNode (#18015)

Minor followup to #17929 based on this discussion

Also includes some very minor refactoring/renaming on the side. The only real change is in the KGroupedStreamImpl class

Reviewers: Guozhang Wang <guozhang.wang.us@gmail.com>
This commit is contained in:
A. Sophie Blee-Goldman 2024-12-03 22:18:55 -08:00 committed by GitHub
parent 095bd0a6d4
commit 31d97bc3c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 118 additions and 115 deletions

View File

@ -58,7 +58,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde,
final String queryableName,
final boolean isOutputVersioned) {
processRepartitions(groupPatterns, storeFactory.name());
processRepartitions(groupPatterns, storeFactory.storeName());
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
boolean stateCreated = false;
@ -80,7 +80,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
}
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.name());
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName());
}
@SuppressWarnings("unchecked")
@ -92,7 +92,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde,
final String queryableName,
final Windows<W> windows) {
processRepartitions(groupPatterns, storeFactory.name());
processRepartitions(groupPatterns, storeFactory.storeName());
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
@ -119,7 +119,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
}
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.name());
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName());
}
@SuppressWarnings("unchecked")
@ -132,7 +132,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final String queryableName,
final SessionWindows sessionWindows,
final Merger<? super K, VOut> sessionMerger) {
processRepartitions(groupPatterns, storeFactory.name());
processRepartitions(groupPatterns, storeFactory.storeName());
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
boolean stateCreated = false;
@ -159,7 +159,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
}
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.name());
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName());
}
@SuppressWarnings("unchecked")
@ -171,7 +171,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde,
final String queryableName,
final SlidingWindows slidingWindows) {
processRepartitions(groupPatterns, storeFactory.name());
processRepartitions(groupPatterns, storeFactory.storeName());
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
final Collection<GraphNode> processors = new ArrayList<>();
boolean stateCreated = false;
@ -198,7 +198,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
}
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.name());
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName());
}
private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
@ -279,7 +279,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
new StatefulProcessorNode<>(
processorName,
new ProcessorParameters<>(kStreamAggregate, processorName),
new String[]{storeFactory.name()}
new String[]{storeFactory.storeName()}
);
}

View File

@ -73,7 +73,7 @@ class GroupedStreamAggregateBuilder<K, V> {
final Serde<KR> keySerde,
final Serde<VR> valueSerde,
final boolean isOutputVersioned) {
assert queryableStoreName == null || queryableStoreName.equals(storeFactory.name());
assert queryableStoreName == null || queryableStoreName.equals(storeFactory.storeName());
final String aggFunctionName = functionName.name();
@ -82,7 +82,7 @@ class GroupedStreamAggregateBuilder<K, V> {
if (repartitionRequired) {
final OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder = optimizableRepartitionNodeBuilder();
final String repartitionTopicPrefix = userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : storeFactory.name();
final String repartitionTopicPrefix = userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : storeFactory.storeName();
sourceName = createRepartitionSource(repartitionTopicPrefix, repartitionNodeBuilder);
// First time through we need to create a repartition node.
@ -101,7 +101,7 @@ class GroupedStreamAggregateBuilder<K, V> {
new StatefulProcessorNode<>(
aggFunctionName,
new ProcessorParameters<>(aggregateSupplier, aggFunctionName),
storeFactory
new String[] {storeFactory.storeName()}
);
statefulProcessorNode.setOutputVersioned(isOutputVersioned);

View File

@ -97,10 +97,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
}
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
final KeyValueStoreMaterializer<K, V> storeFactory = new KeyValueStoreMaterializer<>(materializedInternal);
return doAggregate(
new KStreamReduce<>(materializedInternal, reducer),
new KStreamReduce<>(storeFactory, reducer),
name,
materializedInternal
storeFactory
);
}
@ -129,10 +131,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
}
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
final KeyValueStoreMaterializer<K, VR> storeFactory = new KeyValueStoreMaterializer<>(materializedInternal);
return doAggregate(
new KStreamAggregate<>(materializedInternal, initializer, aggregator),
new KStreamAggregate<>(storeFactory, initializer, aggregator),
name,
materializedInternal
storeFactory
);
}
@ -183,10 +187,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
}
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
final KeyValueStoreMaterializer<K, Long> storeFactory = new KeyValueStoreMaterializer<>(materializedInternal);
return doAggregate(
new KStreamAggregate<>(materializedInternal, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
new KStreamAggregate<>(storeFactory, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
name,
materializedInternal);
storeFactory);
}
@Override
@ -236,15 +242,16 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, V, K, T> aggregateSupplier,
final String functionName,
final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
final KeyValueStoreMaterializer<K, T> storeFactory) {
return aggregateBuilder.build(
new NamedInternal(functionName),
new KeyValueStoreMaterializer<>(materializedInternal),
storeFactory,
aggregateSupplier,
materializedInternal.queryableStoreName(),
materializedInternal.keySerde(),
materializedInternal.valueSerde(),
materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier);
storeFactory.queryableStoreName(),
storeFactory.keySerde(),
storeFactory.valueSerde(),
storeFactory.storeSupplier() instanceof VersionedBytesStoreSupplier);
}
@Override

View File

@ -17,7 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
@ -28,7 +27,6 @@ import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
@ -55,20 +53,11 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements KStreamAggProcessorSupp
private boolean sendOldValues = false;
KStreamAggregate(final MaterializedInternal<KIn, VAgg, KeyValueStore<Bytes, byte[]>> materialized,
final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) {
this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
this.storeName = materialized.storeName();
this.initializer = initializer;
this.aggregator = aggregator;
}
KStreamAggregate(final StoreFactory storeFactory,
final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) {
this.storeFactory = storeFactory;
this.storeName = storeFactory.name();
this.storeName = storeFactory.storeName();
this.initializer = initializer;
this.aggregator = aggregator;
}

View File

@ -146,16 +146,16 @@ class KStreamImplJoin {
otherWindowStore = joinWindowStoreBuilderFromSupplier(otherStoreSupplier, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde());
}
final KStreamJoinWindow<K, V1> thisWindowedStream = new KStreamJoinWindow<>(thisWindowStore.name());
final KStreamJoinWindow<K, V1> thisWindowedStream = new KStreamJoinWindow<>(thisWindowStore.storeName());
final ProcessorParameters<K, V1, ?, ?> thisWindowStreamProcessorParams = new ProcessorParameters<>(thisWindowedStream, thisWindowStreamProcessorName);
final ProcessorGraphNode<K, V1> thisWindowedStreamsNode = new WindowedStreamProcessorNode<>(thisWindowStore.name(), thisWindowStreamProcessorParams);
final ProcessorGraphNode<K, V1> thisWindowedStreamsNode = new WindowedStreamProcessorNode<>(thisWindowStore.storeName(), thisWindowStreamProcessorParams);
builder.addGraphNode(thisGraphNode, thisWindowedStreamsNode);
final KStreamJoinWindow<K, V2> otherWindowedStream = new KStreamJoinWindow<>(otherWindowStore.name());
final KStreamJoinWindow<K, V2> otherWindowedStream = new KStreamJoinWindow<>(otherWindowStore.storeName());
final ProcessorParameters<K, V2, ?, ?> otherWindowStreamProcessorParams = new ProcessorParameters<>(otherWindowedStream, otherWindowStreamProcessorName);
final ProcessorGraphNode<K, V2> otherWindowedStreamsNode = new WindowedStreamProcessorNode<>(otherWindowStore.name(), otherWindowStreamProcessorParams);
final ProcessorGraphNode<K, V2> otherWindowedStreamsNode = new WindowedStreamProcessorNode<>(otherWindowStore.storeName(), otherWindowStreamProcessorParams);
builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
Optional<StoreFactory> outerJoinWindowStore = Optional.empty();
@ -173,25 +173,25 @@ class KStreamImplJoin {
final JoinWindowsInternal internalWindows = new JoinWindowsInternal(windows);
final KStreamKStreamJoinLeftSide<K, V1, V2, VOut> joinThis = new KStreamKStreamJoinLeftSide<>(
otherWindowStore.name(),
otherWindowStore.storeName(),
internalWindows,
joiner,
leftOuter,
outerJoinWindowStore.map(StoreFactory::name),
outerJoinWindowStore.map(StoreFactory::storeName),
sharedTimeTrackerSupplier
);
final KStreamKStreamJoinRightSide<K, V1, V2, VOut> joinOther = new KStreamKStreamJoinRightSide<>(
thisWindowStore.name(),
thisWindowStore.storeName(),
internalWindows,
AbstractStream.reverseJoinerWithKey(joiner),
rightOuter,
outerJoinWindowStore.map(StoreFactory::name),
outerJoinWindowStore.map(StoreFactory::storeName),
sharedTimeTrackerSupplier
);
final KStreamKStreamSelfJoin<K, V1, V2, VOut> selfJoin = new KStreamKStreamSelfJoin<>(
thisWindowStore.name(),
thisWindowStore.storeName(),
internalWindows,
joiner,
windows.size() + windows.gracePeriodMs()

View File

@ -17,7 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
@ -27,7 +26,6 @@ import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
@ -53,9 +51,9 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, V, K,
private boolean sendOldValues = false;
KStreamReduce(final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized, final Reducer<V> reducer) {
this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
this.storeName = materialized.storeName();
KStreamReduce(final StoreFactory storeFactory, final Reducer<V> reducer) {
this.storeFactory = storeFactory;
this.storeName = storeFactory.storeName();
this.reducer = reducer;
}

View File

@ -75,7 +75,7 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
final Aggregator<? super KIn, ? super VIn, VAgg> aggregator,
final Merger<? super KIn, VAgg> sessionMerger) {
this.windows = windows;
this.storeName = storeFactory.name();
this.storeName = storeFactory.storeName();
this.storeFactory = storeFactory;
this.emitStrategy = emitStrategy;
this.initializer = initializer;

View File

@ -64,7 +64,7 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) {
this.windows = windows;
this.storeName = storeFactory.name();
this.storeName = storeFactory.storeName();
this.storeFactory = storeFactory;
this.initializer = initializer;
this.aggregator = aggregator;

View File

@ -63,7 +63,7 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends Window> implements
final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) {
this.windows = windows;
this.storeName = storeFactory.name();
this.storeName = storeFactory.storeName();
this.storeFactory = storeFactory;
this.emitStrategy = emitStrategy;
this.initializer = initializer;

View File

@ -17,7 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.DslKeyValueParams;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
@ -44,7 +43,7 @@ public class KeyValueStoreMaterializer<K, V> extends MaterializedStoreFactory<K,
}
@Override
public StateStore build() {
public StoreBuilder<?> builder() {
final KeyValueBytesStoreSupplier supplier = materialized.storeSupplier() == null
? dslStoreSuppliers().keyValueStore(new DslKeyValueParams(materialized.storeName(), true))
: (KeyValueBytesStoreSupplier) materialized.storeSupplier();
@ -77,7 +76,7 @@ public class KeyValueStoreMaterializer<K, V> extends MaterializedStoreFactory<K,
}
return builder.build();
return builder;
}
@Override

View File

@ -16,8 +16,10 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.StoreSupplier;
import java.util.Map;
@ -39,10 +41,26 @@ public abstract class MaterializedStoreFactory<K, V, S extends StateStore> exten
}
@Override
public String name() {
public String storeName() {
return materialized.storeName();
}
public String queryableStoreName() {
return materialized.queryableStoreName();
}
public Serde<K> keySerde() {
return materialized.keySerde();
}
public Serde<V> valueSerde() {
return materialized.valueSerde();
}
public StoreSupplier<S> storeSupplier() {
return materialized.storeSupplier();
}
@Override
public Map<String, String> logConfig() {
return materialized.logConfig();

View File

@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslKeyValueParams;
import org.apache.kafka.streams.state.DslStoreSuppliers;
@ -73,7 +72,7 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends AbstractConfigurable
}
@Override
public StateStore build() {
public StoreBuilder<?> builder() {
final Duration retentionPeriod = Duration.ofMillis(retentionPeriod());
final Duration windowSize = Duration.ofMillis(windows.size());
final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
@ -135,7 +134,7 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends AbstractConfigurable
builder.withLoggingDisabled();
}
return builder.build();
return builder;
}
@Override
@ -155,7 +154,7 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends AbstractConfigurable
}
@Override
public String name() {
public String storeName() {
return name;
}

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.DslSessionParams;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
@ -58,7 +57,7 @@ public class SessionStoreMaterializer<K, V> extends MaterializedStoreFactory<K,
}
@Override
public StateStore build() {
public StoreBuilder<?> builder() {
final SessionBytesStoreSupplier supplier = materialized.storeSupplier() == null
? dslStoreSuppliers().sessionStore(new DslSessionParams(
materialized.storeName(),
@ -85,7 +84,7 @@ public class SessionStoreMaterializer<K, V> extends MaterializedStoreFactory<K,
builder.withCachingDisabled();
}
return builder.build();
return builder;
}
@Override

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.DslWindowParams;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@ -58,7 +57,7 @@ public class SlidingWindowStoreMaterializer<K, V> extends MaterializedStoreFacto
}
@Override
public StateStore build() {
public StoreBuilder<?> builder() {
final WindowBytesStoreSupplier supplier = materialized.storeSupplier() == null
? dslStoreSuppliers().windowStore(new DslWindowParams(
materialized.storeName(),
@ -91,7 +90,7 @@ public class SlidingWindowStoreMaterializer<K, V> extends MaterializedStoreFacto
builder.withCachingDisabled();
}
return builder.build();
return builder;
}
@Override

View File

@ -20,7 +20,6 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslWindowParams;
import org.apache.kafka.streams.state.StoreBuilder;
@ -81,7 +80,7 @@ public class StreamJoinedStoreFactory<K, V1, V2> extends AbstractConfigurableSto
}
@Override
public StateStore build() {
public StoreBuilder<?> builder() {
final WindowBytesStoreSupplier supplier = storeSupplier == null
? dslStoreSuppliers().windowStore(new DslWindowParams(
this.name,
@ -106,7 +105,7 @@ public class StreamJoinedStoreFactory<K, V1, V2> extends AbstractConfigurableSto
builder.withLoggingDisabled();
}
return builder.build();
return builder;
}
@Override
@ -126,7 +125,7 @@ public class StreamJoinedStoreFactory<K, V1, V2> extends AbstractConfigurableSto
}
@Override
public String name() {
public String storeName() {
return name;
}

View File

@ -20,7 +20,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslKeyValueParams;
import org.apache.kafka.streams.state.StoreBuilder;
@ -45,7 +44,7 @@ public class SubscriptionStoreFactory<K> extends AbstractConfigurableStoreFactor
}
@Override
public StateStore build() {
public StoreBuilder<?> builder() {
StoreBuilder<?> builder;
builder = Stores.timestampedKeyValueStoreBuilder(
dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name, true)),
@ -58,7 +57,7 @@ public class SubscriptionStoreFactory<K> extends AbstractConfigurableStoreFactor
builder = builder.withLoggingDisabled();
}
builder = builder.withCachingDisabled();
return builder.build();
return builder;
}
@Override
@ -78,7 +77,7 @@ public class SubscriptionStoreFactory<K> extends AbstractConfigurableStoreFactor
}
@Override
public String name() {
public String storeName() {
return name;
}

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.DslWindowParams;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@ -56,7 +55,7 @@ public class WindowStoreMaterializer<K, V> extends MaterializedStoreFactory<K, V
}
@Override
public StateStore build() {
public StoreBuilder<?> builder() {
final WindowBytesStoreSupplier supplier = materialized.storeSupplier() == null
? dslStoreSuppliers().windowStore(new DslWindowParams(
materialized.storeName(),
@ -85,7 +84,7 @@ public class WindowStoreMaterializer<K, V> extends MaterializedStoreFactory<K, V
builder.withCachingEnabled();
}
return builder.build();
return builder;
}
@Override

View File

@ -25,7 +25,7 @@ public class StateStoreNode<S extends StateStore> extends GraphNode {
protected final StoreFactory storeBuilder;
public StateStoreNode(final StoreFactory storeBuilder) {
super(storeBuilder.name());
super(storeBuilder.storeName());
this.storeBuilder = storeBuilder;
}
@ -38,7 +38,7 @@ public class StateStoreNode<S extends StateStore> extends GraphNode {
@Override
public String toString() {
return "StateStoreNode{" +
" name='" + storeBuilder.name() + '\'' +
" name='" + storeBuilder.storeName() + '\'' +
", logConfig=" + storeBuilder.logConfig() +
", loggingEnabled='" + storeBuilder.loggingEnabled() + '\'' +
"} ";

View File

@ -54,7 +54,7 @@ public class TableProcessorNode<K, V> extends GraphNode {
public String toString() {
return "TableProcessorNode{" +
", processorParameters=" + processorParameters +
", storeFactory=" + (storeFactory == null ? "null" : storeFactory.name()) +
", storeFactory=" + (storeFactory == null ? "null" : storeFactory.storeName()) +
", storeNames=" + Arrays.toString(storeNames) +
"} " + super.toString();
}

View File

@ -24,7 +24,6 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.StateStore;
@ -433,7 +432,7 @@ public class InternalTopologyBuilder {
// build global state stores
for (final StoreFactory storeFactory : globalStateBuilders.values()) {
storeFactory.configure(config);
globalStateStores.put(storeFactory.name(), storeFactory.build());
globalStateStores.put(storeFactory.storeName(), storeFactory.builder().build());
}
return this;
@ -620,20 +619,20 @@ public class InternalTopologyBuilder {
final boolean allowOverride,
final String... processorNames) {
Objects.requireNonNull(storeFactory, "stateStoreFactory can't be null");
final StoreFactory stateFactory = stateFactories.get(storeFactory.name());
final StoreFactory stateFactory = stateFactories.get(storeFactory.storeName());
if (!allowOverride && stateFactory != null && !stateFactory.isCompatibleWith(storeFactory)) {
throw new TopologyException("A different StateStore has already been added with the name " + storeFactory.name());
throw new TopologyException("A different StateStore has already been added with the name " + storeFactory.storeName());
}
if (globalStateBuilders.containsKey(storeFactory.name())) {
throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeFactory.name());
if (globalStateBuilders.containsKey(storeFactory.storeName())) {
throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeFactory.storeName());
}
stateFactories.put(storeFactory.name(), storeFactory);
stateFactories.put(storeFactory.storeName(), storeFactory);
if (processorNames != null) {
for (final String processorName : processorNames) {
Objects.requireNonNull(processorName, "processor name must not be null");
connectProcessorAndStateStore(processorName, storeFactory.name());
connectProcessorAndStateStore(processorName, storeFactory.storeName());
}
}
nodeGroups = null;
@ -660,7 +659,7 @@ public class InternalTopologyBuilder {
topic,
processorName,
stateUpdateSupplier,
storeFactory.name(),
storeFactory.storeName(),
storeFactory.loggingEnabled());
validateTopicNotAlreadyRegistered(topic);
@ -682,18 +681,18 @@ public class InternalTopologyBuilder {
keyDeserializer,
valueDeserializer)
);
storeNameToReprocessOnRestore.put(storeFactory.name(),
storeNameToReprocessOnRestore.put(storeFactory.storeName(),
reprocessOnRestore ?
Optional.of(new ReprocessFactory<>(stateUpdateSupplier, keyDeserializer, valueDeserializer))
: Optional.empty());
nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
nodeGrouper.add(sourceName);
nodeFactory.addStateStore(storeFactory.name());
nodeFactory.addStateStore(storeFactory.storeName());
nodeFactories.put(processorName, nodeFactory);
nodeGrouper.add(processorName);
nodeGrouper.unite(processorName, predecessors);
globalStateBuilders.put(storeFactory.name(), storeFactory);
connectSourceStoreAndTopic(storeFactory.name(), topic);
globalStateBuilders.put(storeFactory.storeName(), storeFactory);
connectSourceStoreAndTopic(storeFactory.storeName(), topic);
nodeGroups = null;
}
@ -1158,7 +1157,7 @@ public class InternalTopologyBuilder {
if (topologyConfigs != null) {
storeFactory.configure(topologyConfigs.applicationConfigs);
}
store = storeFactory.build();
store = storeFactory.builder().build();
stateStoreMap.put(stateStoreName, store);
} else {
store = globalStateStores.get(stateStoreName);
@ -1258,8 +1257,8 @@ public class InternalTopologyBuilder {
// if the node is connected to a state store whose changelog topics are not predefined,
// add to the changelog topics
for (final StoreFactory stateFactory : stateFactories.values()) {
if (stateFactory.connectedProcessorNames().contains(node) && storeToChangelogTopic.containsKey(stateFactory.name())) {
final String topicName = storeToChangelogTopic.get(stateFactory.name());
if (stateFactory.connectedProcessorNames().contains(node) && storeToChangelogTopic.containsKey(stateFactory.storeName())) {
final String topicName = storeToChangelogTopic.get(stateFactory.storeName());
if (!stateChangelogTopics.containsKey(topicName)) {
final InternalTopicConfig internalTopicConfig =
createChangelogTopicConfig(stateFactory, topicName);

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
@ -51,8 +50,8 @@ public class StoreBuilderWrapper implements StoreFactory {
}
@Override
public StateStore build() {
return builder.build();
public StoreBuilder<?> builder() {
return builder;
}
@Override
@ -90,7 +89,7 @@ public class StoreBuilderWrapper implements StoreFactory {
}
@Override
public String name() {
public String storeName() {
return builder.name();
}

View File

@ -52,7 +52,7 @@ public interface StoreFactory {
// do nothing
}
StateStore build();
StoreBuilder<?> builder();
long retentionPeriod();
@ -62,7 +62,7 @@ public interface StoreFactory {
boolean loggingEnabled();
String name();
String storeName();
boolean isWindowStore();
@ -132,7 +132,7 @@ public interface StoreFactory {
@SuppressWarnings("unchecked")
@Override
public T build() {
return (T) storeFactory.build();
return (T) storeFactory.builder().build();
}
@Override
@ -147,7 +147,7 @@ public interface StoreFactory {
@Override
public String name() {
return storeFactory.name();
return storeFactory.storeName();
}
}

View File

@ -572,11 +572,11 @@ public class InternalTopologyBuilderTest {
assertEquals(0, builder.buildTopology().stateStores().size());
builder.connectProcessorAndStateStores("processor-1", storeFactory.name());
builder.connectProcessorAndStateStores("processor-1", storeFactory.storeName());
final List<StateStore> suppliers = builder.buildTopology().stateStores();
assertEquals(1, suppliers.size());
assertEquals(storeFactory.name(), suppliers.get(0).name());
assertEquals(storeFactory.storeName(), suppliers.get(0).name());
}
@Test
@ -586,14 +586,14 @@ public class InternalTopologyBuilderTest {
builder.addSource(null, "source-1", null, null, null, "topic-1");
builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source-1");
builder.connectProcessorAndStateStores("processor-1", storeFactory.name());
builder.connectProcessorAndStateStores("processor-1", storeFactory.storeName());
builder.addSource(null, "source-2", null, null, null, "topic-2");
builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), "source-2");
builder.buildTopology();
final Set<String> stateStoreNames = builder.stateStoreNamesForSubtopology(0);
assertThat(stateStoreNames, equalTo(Set.of(storeFactory.name())));
assertThat(stateStoreNames, equalTo(Set.of(storeFactory.storeName())));
final Set<String> emptyStoreNames = builder.stateStoreNamesForSubtopology(1);
assertThat(emptyStoreNames, equalTo(Set.of()));
@ -609,11 +609,11 @@ public class InternalTopologyBuilderTest {
builder.addStateStore(storeFactory);
builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source-1");
builder.connectProcessorAndStateStores("processor-1", storeFactory.name());
builder.connectProcessorAndStateStores("processor-1", storeFactory.storeName());
builder.addStateStore(storeFactory);
builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), "source-1");
builder.connectProcessorAndStateStores("processor-2", storeFactory.name());
builder.connectProcessorAndStateStores("processor-2", storeFactory.storeName());
assertEquals(1, builder.buildTopology().stateStores().size());
}
@ -1196,7 +1196,7 @@ public class InternalTopologyBuilderTest {
builder.setApplicationId("test-app");
final Map<String, List<String>> stateStoreAndTopics = builder.stateStoreNameToFullSourceTopicNames();
final List<String> topics = stateStoreAndTopics.get(storeFactory.name());
final List<String> topics = stateStoreAndTopics.get(storeFactory.storeName());
assertEquals(2, topics.size(), "Expected to contain two topics");

View File

@ -263,7 +263,7 @@ public class KeyValueStoreMaterializerTest {
final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized) {
final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
materializer.configure(streamsConfig);
return (TimestampedKeyValueStore<String, String>) ((StoreFactory) materializer).build();
return (TimestampedKeyValueStore<String, String>) materializer.builder().build();
}
@SuppressWarnings("unchecked")
@ -271,6 +271,6 @@ public class KeyValueStoreMaterializerTest {
final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized) {
final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
materializer.configure(streamsConfig);
return (VersionedKeyValueStore<String, String>) ((StoreFactory) materializer).build();
return (VersionedKeyValueStore<String, String>) materializer.builder().build();
}
}

View File

@ -119,7 +119,7 @@ public class TestUtils {
public static StoreFactory mockStoreFactory(final String name) {
final StoreFactory storeFactory = Mockito.mock(StoreFactory.class);
Mockito.when(storeFactory.name()).thenReturn(name);
Mockito.when(storeFactory.storeName()).thenReturn(name);
return storeFactory;
}