From 31d97bc3c99f543d7a3ca148361e5f346c50fde3 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Tue, 3 Dec 2024 22:18:55 -0800 Subject: [PATCH] KAFKA-18026: KIP-1112, skip re-registering aggregate stores in StatefulProcessorNode (#18015) Minor followup to #17929 based on this discussion Also includes some very minor refactoring/renaming on the side. The only real change is in the KGroupedStreamImpl class Reviewers: Guozhang Wang --- .../CogroupedStreamAggregateBuilder.java | 18 +++++------ .../GroupedStreamAggregateBuilder.java | 6 ++-- .../kstream/internals/KGroupedStreamImpl.java | 31 ++++++++++++------- .../kstream/internals/KStreamAggregate.java | 13 +------- .../kstream/internals/KStreamImplJoin.java | 18 +++++------ .../kstream/internals/KStreamReduce.java | 8 ++--- .../KStreamSessionWindowAggregate.java | 2 +- .../KStreamSlidingWindowAggregate.java | 2 +- .../internals/KStreamWindowAggregate.java | 2 +- .../internals/KeyValueStoreMaterializer.java | 5 ++- .../internals/MaterializedStoreFactory.java | 20 +++++++++++- .../OuterStreamJoinStoreFactory.java | 7 ++--- .../internals/SessionStoreMaterializer.java | 5 ++- .../SlidingWindowStoreMaterializer.java | 5 ++- .../internals/StreamJoinedStoreFactory.java | 7 ++--- .../internals/SubscriptionStoreFactory.java | 7 ++--- .../internals/WindowStoreMaterializer.java | 5 ++- .../internals/graph/StateStoreNode.java | 4 +-- .../internals/graph/TableProcessorNode.java | 2 +- .../internals/InternalTopologyBuilder.java | 31 +++++++++---------- .../internals/StoreBuilderWrapper.java | 7 ++--- .../processor/internals/StoreFactory.java | 8 ++--- .../InternalTopologyBuilderTest.java | 14 ++++----- .../KeyValueStoreMaterializerTest.java | 4 +-- .../apache/kafka/streams/utils/TestUtils.java | 2 +- 25 files changed, 118 insertions(+), 115 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java index 5b294c39d68..126df7de17b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java @@ -58,7 +58,7 @@ class CogroupedStreamAggregateBuilder { final Serde valueSerde, final String queryableName, final boolean isOutputVersioned) { - processRepartitions(groupPatterns, storeFactory.name()); + processRepartitions(groupPatterns, storeFactory.storeName()); final Collection processors = new ArrayList<>(); final Collection parentProcessors = new ArrayList<>(); boolean stateCreated = false; @@ -80,7 +80,7 @@ class CogroupedStreamAggregateBuilder { processors.add(statefulProcessorNode); builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); } - return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.name()); + return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName()); } @SuppressWarnings("unchecked") @@ -92,7 +92,7 @@ class CogroupedStreamAggregateBuilder { final Serde valueSerde, final String queryableName, final Windows windows) { - processRepartitions(groupPatterns, storeFactory.name()); + processRepartitions(groupPatterns, storeFactory.storeName()); final Collection processors = new ArrayList<>(); final Collection parentProcessors = new ArrayList<>(); @@ -119,7 +119,7 @@ class CogroupedStreamAggregateBuilder { processors.add(statefulProcessorNode); builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); } - return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.name()); + return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName()); } @SuppressWarnings("unchecked") @@ -132,7 +132,7 @@ class CogroupedStreamAggregateBuilder { final String queryableName, final SessionWindows sessionWindows, final Merger sessionMerger) { - processRepartitions(groupPatterns, storeFactory.name()); + processRepartitions(groupPatterns, storeFactory.storeName()); final Collection processors = new ArrayList<>(); final Collection parentProcessors = new ArrayList<>(); boolean stateCreated = false; @@ -159,7 +159,7 @@ class CogroupedStreamAggregateBuilder { processors.add(statefulProcessorNode); builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); } - return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.name()); + return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName()); } @SuppressWarnings("unchecked") @@ -171,7 +171,7 @@ class CogroupedStreamAggregateBuilder { final Serde valueSerde, final String queryableName, final SlidingWindows slidingWindows) { - processRepartitions(groupPatterns, storeFactory.name()); + processRepartitions(groupPatterns, storeFactory.storeName()); final Collection parentProcessors = new ArrayList<>(); final Collection processors = new ArrayList<>(); boolean stateCreated = false; @@ -198,7 +198,7 @@ class CogroupedStreamAggregateBuilder { processors.add(statefulProcessorNode); builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); } - return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.name()); + return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName()); } private void processRepartitions(final Map, Aggregator> groupPatterns, @@ -279,7 +279,7 @@ class CogroupedStreamAggregateBuilder { new StatefulProcessorNode<>( processorName, new ProcessorParameters<>(kStreamAggregate, processorName), - new String[]{storeFactory.name()} + new String[]{storeFactory.storeName()} ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index 8217bd025bb..c3360c9c013 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -73,7 +73,7 @@ class GroupedStreamAggregateBuilder { final Serde keySerde, final Serde valueSerde, final boolean isOutputVersioned) { - assert queryableStoreName == null || queryableStoreName.equals(storeFactory.name()); + assert queryableStoreName == null || queryableStoreName.equals(storeFactory.storeName()); final String aggFunctionName = functionName.name(); @@ -82,7 +82,7 @@ class GroupedStreamAggregateBuilder { if (repartitionRequired) { final OptimizableRepartitionNodeBuilder repartitionNodeBuilder = optimizableRepartitionNodeBuilder(); - final String repartitionTopicPrefix = userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : storeFactory.name(); + final String repartitionTopicPrefix = userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : storeFactory.storeName(); sourceName = createRepartitionSource(repartitionTopicPrefix, repartitionNodeBuilder); // First time through we need to create a repartition node. @@ -101,7 +101,7 @@ class GroupedStreamAggregateBuilder { new StatefulProcessorNode<>( aggFunctionName, new ProcessorParameters<>(aggregateSupplier, aggFunctionName), - storeFactory + new String[] {storeFactory.storeName()} ); statefulProcessorNode.setOutputVersioned(isOutputVersioned); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index 256153708a0..cc335e1383d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -97,10 +97,12 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS } final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); + final KeyValueStoreMaterializer storeFactory = new KeyValueStoreMaterializer<>(materializedInternal); + return doAggregate( - new KStreamReduce<>(materializedInternal, reducer), + new KStreamReduce<>(storeFactory, reducer), name, - materializedInternal + storeFactory ); } @@ -129,10 +131,12 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS } final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); + final KeyValueStoreMaterializer storeFactory = new KeyValueStoreMaterializer<>(materializedInternal); + return doAggregate( - new KStreamAggregate<>(materializedInternal, initializer, aggregator), + new KStreamAggregate<>(storeFactory, initializer, aggregator), name, - materializedInternal + storeFactory ); } @@ -183,10 +187,12 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS } final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); + final KeyValueStoreMaterializer storeFactory = new KeyValueStoreMaterializer<>(materializedInternal); + return doAggregate( - new KStreamAggregate<>(materializedInternal, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), + new KStreamAggregate<>(storeFactory, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), name, - materializedInternal); + storeFactory); } @Override @@ -236,15 +242,16 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS private KTable doAggregate(final KStreamAggProcessorSupplier aggregateSupplier, final String functionName, - final MaterializedInternal> materializedInternal) { + final KeyValueStoreMaterializer storeFactory) { + return aggregateBuilder.build( new NamedInternal(functionName), - new KeyValueStoreMaterializer<>(materializedInternal), + storeFactory, aggregateSupplier, - materializedInternal.queryableStoreName(), - materializedInternal.keySerde(), - materializedInternal.valueSerde(), - materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier); + storeFactory.queryableStoreName(), + storeFactory.keySerde(), + storeFactory.valueSerde(), + storeFactory.storeSupplier() instanceof VersionedBytesStoreSupplier); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index abe5fd2b566..bfbd16ffae8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.processor.api.ContextualProcessor; @@ -28,7 +27,6 @@ import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper; @@ -55,20 +53,11 @@ public class KStreamAggregate implements KStreamAggProcessorSupp private boolean sendOldValues = false; - KStreamAggregate(final MaterializedInternal> materialized, - final Initializer initializer, - final Aggregator aggregator) { - this.storeFactory = new KeyValueStoreMaterializer<>(materialized); - this.storeName = materialized.storeName(); - this.initializer = initializer; - this.aggregator = aggregator; - } - KStreamAggregate(final StoreFactory storeFactory, final Initializer initializer, final Aggregator aggregator) { this.storeFactory = storeFactory; - this.storeName = storeFactory.name(); + this.storeName = storeFactory.storeName(); this.initializer = initializer; this.aggregator = aggregator; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java index 12bb6c19db8..d9008e81c8d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java @@ -146,16 +146,16 @@ class KStreamImplJoin { otherWindowStore = joinWindowStoreBuilderFromSupplier(otherStoreSupplier, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde()); } - final KStreamJoinWindow thisWindowedStream = new KStreamJoinWindow<>(thisWindowStore.name()); + final KStreamJoinWindow thisWindowedStream = new KStreamJoinWindow<>(thisWindowStore.storeName()); final ProcessorParameters thisWindowStreamProcessorParams = new ProcessorParameters<>(thisWindowedStream, thisWindowStreamProcessorName); - final ProcessorGraphNode thisWindowedStreamsNode = new WindowedStreamProcessorNode<>(thisWindowStore.name(), thisWindowStreamProcessorParams); + final ProcessorGraphNode thisWindowedStreamsNode = new WindowedStreamProcessorNode<>(thisWindowStore.storeName(), thisWindowStreamProcessorParams); builder.addGraphNode(thisGraphNode, thisWindowedStreamsNode); - final KStreamJoinWindow otherWindowedStream = new KStreamJoinWindow<>(otherWindowStore.name()); + final KStreamJoinWindow otherWindowedStream = new KStreamJoinWindow<>(otherWindowStore.storeName()); final ProcessorParameters otherWindowStreamProcessorParams = new ProcessorParameters<>(otherWindowedStream, otherWindowStreamProcessorName); - final ProcessorGraphNode otherWindowedStreamsNode = new WindowedStreamProcessorNode<>(otherWindowStore.name(), otherWindowStreamProcessorParams); + final ProcessorGraphNode otherWindowedStreamsNode = new WindowedStreamProcessorNode<>(otherWindowStore.storeName(), otherWindowStreamProcessorParams); builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode); Optional outerJoinWindowStore = Optional.empty(); @@ -173,25 +173,25 @@ class KStreamImplJoin { final JoinWindowsInternal internalWindows = new JoinWindowsInternal(windows); final KStreamKStreamJoinLeftSide joinThis = new KStreamKStreamJoinLeftSide<>( - otherWindowStore.name(), + otherWindowStore.storeName(), internalWindows, joiner, leftOuter, - outerJoinWindowStore.map(StoreFactory::name), + outerJoinWindowStore.map(StoreFactory::storeName), sharedTimeTrackerSupplier ); final KStreamKStreamJoinRightSide joinOther = new KStreamKStreamJoinRightSide<>( - thisWindowStore.name(), + thisWindowStore.storeName(), internalWindows, AbstractStream.reverseJoinerWithKey(joiner), rightOuter, - outerJoinWindowStore.map(StoreFactory::name), + outerJoinWindowStore.map(StoreFactory::storeName), sharedTimeTrackerSupplier ); final KStreamKStreamSelfJoin selfJoin = new KStreamKStreamSelfJoin<>( - thisWindowStore.name(), + thisWindowStore.storeName(), internalWindows, joiner, windows.size() + windows.gracePeriodMs() diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java index f337cd9ae44..2f04a8ea65e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.Processor; @@ -27,7 +26,6 @@ import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper; @@ -53,9 +51,9 @@ public class KStreamReduce implements KStreamAggProcessorSupplier> materialized, final Reducer reducer) { - this.storeFactory = new KeyValueStoreMaterializer<>(materialized); - this.storeName = materialized.storeName(); + KStreamReduce(final StoreFactory storeFactory, final Reducer reducer) { + this.storeFactory = storeFactory; + this.storeName = storeFactory.storeName(); this.reducer = reducer; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index 4a8040a8d37..f3ca9e6740a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -75,7 +75,7 @@ public class KStreamSessionWindowAggregate implements KStreamAgg final Aggregator aggregator, final Merger sessionMerger) { this.windows = windows; - this.storeName = storeFactory.name(); + this.storeName = storeFactory.storeName(); this.storeFactory = storeFactory; this.emitStrategy = emitStrategy; this.initializer = initializer; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java index 894657da48c..93935cbc1f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java @@ -64,7 +64,7 @@ public class KStreamSlidingWindowAggregate implements KStreamAgg final Initializer initializer, final Aggregator aggregator) { this.windows = windows; - this.storeName = storeFactory.name(); + this.storeName = storeFactory.storeName(); this.storeFactory = storeFactory; this.initializer = initializer; this.aggregator = aggregator; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index 2e6147627e9..adb174c4ccd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -63,7 +63,7 @@ public class KStreamWindowAggregate implements final Initializer initializer, final Aggregator aggregator) { this.windows = windows; - this.storeName = storeFactory.name(); + this.storeName = storeFactory.storeName(); this.storeFactory = storeFactory; this.emitStrategy = emitStrategy; this.initializer = initializer; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java index 3927e95c25b..d59d34e0e90 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.DslKeyValueParams; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; @@ -44,7 +43,7 @@ public class KeyValueStoreMaterializer extends MaterializedStoreFactory builder() { final KeyValueBytesStoreSupplier supplier = materialized.storeSupplier() == null ? dslStoreSuppliers().keyValueStore(new DslKeyValueParams(materialized.storeName(), true)) : (KeyValueBytesStoreSupplier) materialized.storeSupplier(); @@ -77,7 +76,7 @@ public class KeyValueStoreMaterializer extends MaterializedStoreFactory exten } @Override - public String name() { + public String storeName() { return materialized.storeName(); } + public String queryableStoreName() { + return materialized.queryableStoreName(); + } + + public Serde keySerde() { + return materialized.keySerde(); + } + + public Serde valueSerde() { + return materialized.valueSerde(); + } + + public StoreSupplier storeSupplier() { + return materialized.storeSupplier(); + } + @Override public Map logConfig() { return materialized.logConfig(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java index d864698408b..645858d1a65 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.kstream.JoinWindows; -import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.state.DslKeyValueParams; import org.apache.kafka.streams.state.DslStoreSuppliers; @@ -73,7 +72,7 @@ public class OuterStreamJoinStoreFactory extends AbstractConfigurable } @Override - public StateStore build() { + public StoreBuilder builder() { final Duration retentionPeriod = Duration.ofMillis(retentionPeriod()); final Duration windowSize = Duration.ofMillis(windows.size()); final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); @@ -135,7 +134,7 @@ public class OuterStreamJoinStoreFactory extends AbstractConfigurable builder.withLoggingDisabled(); } - return builder.build(); + return builder; } @Override @@ -155,7 +154,7 @@ public class OuterStreamJoinStoreFactory extends AbstractConfigurable } @Override - public String name() { + public String storeName() { return name; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java index 9f63b3fc279..a5317f48880 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.EmitStrategy; import org.apache.kafka.streams.kstream.SessionWindows; -import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.DslSessionParams; import org.apache.kafka.streams.state.SessionBytesStoreSupplier; import org.apache.kafka.streams.state.SessionStore; @@ -58,7 +57,7 @@ public class SessionStoreMaterializer extends MaterializedStoreFactory builder() { final SessionBytesStoreSupplier supplier = materialized.storeSupplier() == null ? dslStoreSuppliers().sessionStore(new DslSessionParams( materialized.storeName(), @@ -85,7 +84,7 @@ public class SessionStoreMaterializer extends MaterializedStoreFactory extends MaterializedStoreFacto } @Override - public StateStore build() { + public StoreBuilder builder() { final WindowBytesStoreSupplier supplier = materialized.storeSupplier() == null ? dslStoreSuppliers().windowStore(new DslWindowParams( materialized.storeName(), @@ -91,7 +90,7 @@ public class SlidingWindowStoreMaterializer extends MaterializedStoreFacto builder.withCachingDisabled(); } - return builder.build(); + return builder; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java index b6e969572c8..4da99a71d61 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.EmitStrategy; import org.apache.kafka.streams.kstream.JoinWindows; -import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.state.DslWindowParams; import org.apache.kafka.streams.state.StoreBuilder; @@ -81,7 +80,7 @@ public class StreamJoinedStoreFactory extends AbstractConfigurableSto } @Override - public StateStore build() { + public StoreBuilder builder() { final WindowBytesStoreSupplier supplier = storeSupplier == null ? dslStoreSuppliers().windowStore(new DslWindowParams( this.name, @@ -106,7 +105,7 @@ public class StreamJoinedStoreFactory extends AbstractConfigurableSto builder.withLoggingDisabled(); } - return builder.build(); + return builder; } @Override @@ -126,7 +125,7 @@ public class StreamJoinedStoreFactory extends AbstractConfigurableSto } @Override - public String name() { + public String storeName() { return name; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java index f3c424efb3e..10c8a5e110c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java @@ -20,7 +20,6 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper; -import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.state.DslKeyValueParams; import org.apache.kafka.streams.state.StoreBuilder; @@ -45,7 +44,7 @@ public class SubscriptionStoreFactory extends AbstractConfigurableStoreFactor } @Override - public StateStore build() { + public StoreBuilder builder() { StoreBuilder builder; builder = Stores.timestampedKeyValueStoreBuilder( dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name, true)), @@ -58,7 +57,7 @@ public class SubscriptionStoreFactory extends AbstractConfigurableStoreFactor builder = builder.withLoggingDisabled(); } builder = builder.withCachingDisabled(); - return builder.build(); + return builder; } @Override @@ -78,7 +77,7 @@ public class SubscriptionStoreFactory extends AbstractConfigurableStoreFactor } @Override - public String name() { + public String storeName() { return name; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java index eabce874f70..2b9f3d33814 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.EmitStrategy; import org.apache.kafka.streams.kstream.Windows; -import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.DslWindowParams; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -56,7 +55,7 @@ public class WindowStoreMaterializer extends MaterializedStoreFactory builder() { final WindowBytesStoreSupplier supplier = materialized.storeSupplier() == null ? dslStoreSuppliers().windowStore(new DslWindowParams( materialized.storeName(), @@ -85,7 +84,7 @@ public class WindowStoreMaterializer extends MaterializedStoreFactory extends GraphNode { protected final StoreFactory storeBuilder; public StateStoreNode(final StoreFactory storeBuilder) { - super(storeBuilder.name()); + super(storeBuilder.storeName()); this.storeBuilder = storeBuilder; } @@ -38,7 +38,7 @@ public class StateStoreNode extends GraphNode { @Override public String toString() { return "StateStoreNode{" + - " name='" + storeBuilder.name() + '\'' + + " name='" + storeBuilder.storeName() + '\'' + ", logConfig=" + storeBuilder.logConfig() + ", loggingEnabled='" + storeBuilder.loggingEnabled() + '\'' + "} "; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java index ccd87855a07..b47252068e6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java @@ -54,7 +54,7 @@ public class TableProcessorNode extends GraphNode { public String toString() { return "TableProcessorNode{" + ", processorParameters=" + processorParameters + - ", storeFactory=" + (storeFactory == null ? "null" : storeFactory.name()) + + ", storeFactory=" + (storeFactory == null ? "null" : storeFactory.storeName()) + ", storeNames=" + Arrays.toString(storeNames) + "} " + super.toString(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 9f65a415d95..eeb076fc0cf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyConfig; -import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.processor.StateStore; @@ -433,7 +432,7 @@ public class InternalTopologyBuilder { // build global state stores for (final StoreFactory storeFactory : globalStateBuilders.values()) { storeFactory.configure(config); - globalStateStores.put(storeFactory.name(), storeFactory.build()); + globalStateStores.put(storeFactory.storeName(), storeFactory.builder().build()); } return this; @@ -620,20 +619,20 @@ public class InternalTopologyBuilder { final boolean allowOverride, final String... processorNames) { Objects.requireNonNull(storeFactory, "stateStoreFactory can't be null"); - final StoreFactory stateFactory = stateFactories.get(storeFactory.name()); + final StoreFactory stateFactory = stateFactories.get(storeFactory.storeName()); if (!allowOverride && stateFactory != null && !stateFactory.isCompatibleWith(storeFactory)) { - throw new TopologyException("A different StateStore has already been added with the name " + storeFactory.name()); + throw new TopologyException("A different StateStore has already been added with the name " + storeFactory.storeName()); } - if (globalStateBuilders.containsKey(storeFactory.name())) { - throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeFactory.name()); + if (globalStateBuilders.containsKey(storeFactory.storeName())) { + throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeFactory.storeName()); } - stateFactories.put(storeFactory.name(), storeFactory); + stateFactories.put(storeFactory.storeName(), storeFactory); if (processorNames != null) { for (final String processorName : processorNames) { Objects.requireNonNull(processorName, "processor name must not be null"); - connectProcessorAndStateStore(processorName, storeFactory.name()); + connectProcessorAndStateStore(processorName, storeFactory.storeName()); } } nodeGroups = null; @@ -660,7 +659,7 @@ public class InternalTopologyBuilder { topic, processorName, stateUpdateSupplier, - storeFactory.name(), + storeFactory.storeName(), storeFactory.loggingEnabled()); validateTopicNotAlreadyRegistered(topic); @@ -682,18 +681,18 @@ public class InternalTopologyBuilder { keyDeserializer, valueDeserializer) ); - storeNameToReprocessOnRestore.put(storeFactory.name(), + storeNameToReprocessOnRestore.put(storeFactory.storeName(), reprocessOnRestore ? Optional.of(new ReprocessFactory<>(stateUpdateSupplier, keyDeserializer, valueDeserializer)) : Optional.empty()); nodeToSourceTopics.put(sourceName, Arrays.asList(topics)); nodeGrouper.add(sourceName); - nodeFactory.addStateStore(storeFactory.name()); + nodeFactory.addStateStore(storeFactory.storeName()); nodeFactories.put(processorName, nodeFactory); nodeGrouper.add(processorName); nodeGrouper.unite(processorName, predecessors); - globalStateBuilders.put(storeFactory.name(), storeFactory); - connectSourceStoreAndTopic(storeFactory.name(), topic); + globalStateBuilders.put(storeFactory.storeName(), storeFactory); + connectSourceStoreAndTopic(storeFactory.storeName(), topic); nodeGroups = null; } @@ -1158,7 +1157,7 @@ public class InternalTopologyBuilder { if (topologyConfigs != null) { storeFactory.configure(topologyConfigs.applicationConfigs); } - store = storeFactory.build(); + store = storeFactory.builder().build(); stateStoreMap.put(stateStoreName, store); } else { store = globalStateStores.get(stateStoreName); @@ -1258,8 +1257,8 @@ public class InternalTopologyBuilder { // if the node is connected to a state store whose changelog topics are not predefined, // add to the changelog topics for (final StoreFactory stateFactory : stateFactories.values()) { - if (stateFactory.connectedProcessorNames().contains(node) && storeToChangelogTopic.containsKey(stateFactory.name())) { - final String topicName = storeToChangelogTopic.get(stateFactory.name()); + if (stateFactory.connectedProcessorNames().contains(node) && storeToChangelogTopic.containsKey(stateFactory.storeName())) { + final String topicName = storeToChangelogTopic.get(stateFactory.storeName()); if (!stateChangelogTopics.containsKey(topicName)) { final InternalTopicConfig internalTopicConfig = createChangelogTopicConfig(stateFactory, topicName); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java index 4648533af1d..61345d7da9b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.SessionStoreBuilder; import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder; @@ -51,8 +50,8 @@ public class StoreBuilderWrapper implements StoreFactory { } @Override - public StateStore build() { - return builder.build(); + public StoreBuilder builder() { + return builder; } @Override @@ -90,7 +89,7 @@ public class StoreBuilderWrapper implements StoreFactory { } @Override - public String name() { + public String storeName() { return builder.name(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java index 7542f4c5bd8..ef6df04c0d6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java @@ -52,7 +52,7 @@ public interface StoreFactory { // do nothing } - StateStore build(); + StoreBuilder builder(); long retentionPeriod(); @@ -62,7 +62,7 @@ public interface StoreFactory { boolean loggingEnabled(); - String name(); + String storeName(); boolean isWindowStore(); @@ -132,7 +132,7 @@ public interface StoreFactory { @SuppressWarnings("unchecked") @Override public T build() { - return (T) storeFactory.build(); + return (T) storeFactory.builder().build(); } @Override @@ -147,7 +147,7 @@ public interface StoreFactory { @Override public String name() { - return storeFactory.name(); + return storeFactory.storeName(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index e3add9755ae..b0afe364985 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -572,11 +572,11 @@ public class InternalTopologyBuilderTest { assertEquals(0, builder.buildTopology().stateStores().size()); - builder.connectProcessorAndStateStores("processor-1", storeFactory.name()); + builder.connectProcessorAndStateStores("processor-1", storeFactory.storeName()); final List suppliers = builder.buildTopology().stateStores(); assertEquals(1, suppliers.size()); - assertEquals(storeFactory.name(), suppliers.get(0).name()); + assertEquals(storeFactory.storeName(), suppliers.get(0).name()); } @Test @@ -586,14 +586,14 @@ public class InternalTopologyBuilderTest { builder.addSource(null, "source-1", null, null, null, "topic-1"); builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source-1"); - builder.connectProcessorAndStateStores("processor-1", storeFactory.name()); + builder.connectProcessorAndStateStores("processor-1", storeFactory.storeName()); builder.addSource(null, "source-2", null, null, null, "topic-2"); builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), "source-2"); builder.buildTopology(); final Set stateStoreNames = builder.stateStoreNamesForSubtopology(0); - assertThat(stateStoreNames, equalTo(Set.of(storeFactory.name()))); + assertThat(stateStoreNames, equalTo(Set.of(storeFactory.storeName()))); final Set emptyStoreNames = builder.stateStoreNamesForSubtopology(1); assertThat(emptyStoreNames, equalTo(Set.of())); @@ -609,11 +609,11 @@ public class InternalTopologyBuilderTest { builder.addStateStore(storeFactory); builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source-1"); - builder.connectProcessorAndStateStores("processor-1", storeFactory.name()); + builder.connectProcessorAndStateStores("processor-1", storeFactory.storeName()); builder.addStateStore(storeFactory); builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), "source-1"); - builder.connectProcessorAndStateStores("processor-2", storeFactory.name()); + builder.connectProcessorAndStateStores("processor-2", storeFactory.storeName()); assertEquals(1, builder.buildTopology().stateStores().size()); } @@ -1196,7 +1196,7 @@ public class InternalTopologyBuilderTest { builder.setApplicationId("test-app"); final Map> stateStoreAndTopics = builder.stateStoreNameToFullSourceTopicNames(); - final List topics = stateStoreAndTopics.get(storeFactory.name()); + final List topics = stateStoreAndTopics.get(storeFactory.storeName()); assertEquals(2, topics.size(), "Expected to contain two topics"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java index 85aa8b5e21f..7228496dd36 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java @@ -263,7 +263,7 @@ public class KeyValueStoreMaterializerTest { final MaterializedInternal> materialized) { final KeyValueStoreMaterializer materializer = new KeyValueStoreMaterializer<>(materialized); materializer.configure(streamsConfig); - return (TimestampedKeyValueStore) ((StoreFactory) materializer).build(); + return (TimestampedKeyValueStore) materializer.builder().build(); } @SuppressWarnings("unchecked") @@ -271,6 +271,6 @@ public class KeyValueStoreMaterializerTest { final MaterializedInternal> materialized) { final KeyValueStoreMaterializer materializer = new KeyValueStoreMaterializer<>(materialized); materializer.configure(streamsConfig); - return (VersionedKeyValueStore) ((StoreFactory) materializer).build(); + return (VersionedKeyValueStore) materializer.builder().build(); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java index 6ecc6b7c7ac..89a229f7ddc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java @@ -119,7 +119,7 @@ public class TestUtils { public static StoreFactory mockStoreFactory(final String name) { final StoreFactory storeFactory = Mockito.mock(StoreFactory.class); - Mockito.when(storeFactory.name()).thenReturn(name); + Mockito.when(storeFactory.storeName()).thenReturn(name); return storeFactory; }