diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java index 161c4a85c52..5b294c39d68 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java @@ -58,14 +58,14 @@ class CogroupedStreamAggregateBuilder { final Serde valueSerde, final String queryableName, final boolean isOutputVersioned) { - processRepartitions(groupPatterns, storeFactory); + processRepartitions(groupPatterns, storeFactory.name()); final Collection processors = new ArrayList<>(); final Collection parentProcessors = new ArrayList<>(); boolean stateCreated = false; int counter = 0; for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) { final KStreamAggProcessorSupplier parentProcessor = - new KStreamAggregate<>(storeFactory.name(), initializer, kGroupedStream.getValue()); + new KStreamAggregate<>(storeFactory, initializer, kGroupedStream.getValue()); parentProcessors.add(parentProcessor); final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode( named.suffixWithOrElseGet( @@ -92,7 +92,7 @@ class CogroupedStreamAggregateBuilder { final Serde valueSerde, final String queryableName, final Windows windows) { - processRepartitions(groupPatterns, storeFactory); + processRepartitions(groupPatterns, storeFactory.name()); final Collection processors = new ArrayList<>(); final Collection parentProcessors = new ArrayList<>(); @@ -102,7 +102,7 @@ class CogroupedStreamAggregateBuilder { final KStreamAggProcessorSupplier parentProcessor = (KStreamAggProcessorSupplier) new KStreamWindowAggregate( windows, - storeFactory.name(), + storeFactory, EmitStrategy.onWindowUpdate(), initializer, kGroupedStream.getValue()); @@ -132,7 +132,7 @@ class CogroupedStreamAggregateBuilder { final String queryableName, final SessionWindows sessionWindows, final Merger sessionMerger) { - processRepartitions(groupPatterns, storeFactory); + processRepartitions(groupPatterns, storeFactory.name()); final Collection processors = new ArrayList<>(); final Collection parentProcessors = new ArrayList<>(); boolean stateCreated = false; @@ -141,7 +141,7 @@ class CogroupedStreamAggregateBuilder { final KStreamAggProcessorSupplier parentProcessor = (KStreamAggProcessorSupplier) new KStreamSessionWindowAggregate( sessionWindows, - storeFactory.name(), + storeFactory, EmitStrategy.onWindowUpdate(), initializer, kGroupedStream.getValue(), @@ -171,7 +171,7 @@ class CogroupedStreamAggregateBuilder { final Serde valueSerde, final String queryableName, final SlidingWindows slidingWindows) { - processRepartitions(groupPatterns, storeFactory); + processRepartitions(groupPatterns, storeFactory.name()); final Collection parentProcessors = new ArrayList<>(); final Collection processors = new ArrayList<>(); boolean stateCreated = false; @@ -180,7 +180,7 @@ class CogroupedStreamAggregateBuilder { final KStreamAggProcessorSupplier parentProcessor = (KStreamAggProcessorSupplier) new KStreamSlidingWindowAggregate( slidingWindows, - storeFactory.name(), + storeFactory, // TODO: We do not have other emit policies for co-group yet EmitStrategy.onWindowUpdate(), initializer, @@ -202,7 +202,7 @@ class CogroupedStreamAggregateBuilder { } private void processRepartitions(final Map, Aggregator> groupPatterns, - final StoreFactory storeFactory) { + final String storeName) { for (final KGroupedStreamImpl repartitionReqs : groupPatterns.keySet()) { if (repartitionReqs.repartitionRequired) { @@ -210,7 +210,7 @@ class CogroupedStreamAggregateBuilder { final OptimizableRepartitionNodeBuilder repartitionNodeBuilder = optimizableRepartitionNodeBuilder(); final String repartitionNamePrefix = repartitionReqs.userProvidedRepartitionTopicName != null ? - repartitionReqs.userProvidedRepartitionTopicName : storeFactory.name(); + repartitionReqs.userProvidedRepartitionTopicName : storeName; createRepartitionSource(repartitionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index 2ed1854d9cc..256153708a0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -98,7 +98,7 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); return doAggregate( - new KStreamReduce<>(materializedInternal.storeName(), reducer), + new KStreamReduce<>(materializedInternal, reducer), name, materializedInternal ); @@ -130,7 +130,7 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); return doAggregate( - new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator), + new KStreamAggregate<>(materializedInternal, initializer, aggregator), name, materializedInternal ); @@ -184,7 +184,7 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); return doAggregate( - new KStreamAggregate<>(materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), + new KStreamAggregate<>(materializedInternal, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), name, materializedInternal); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index d03cb65c021..e500582244b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -91,7 +91,7 @@ public class KGroupedTableImpl extends AbstractStream implements KGr final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>( funcName, new ProcessorParameters<>(aggregateSupplier, funcName), - new KeyValueStoreMaterializer<>(materialized) + new String[]{materialized.storeName()} ); statefulProcessorNode.setOutputVersioned(materialized.storeSupplier() instanceof VersionedBytesStoreSupplier); @@ -148,7 +148,7 @@ public class KGroupedTableImpl extends AbstractStream implements KGr materializedInternal.withValueSerde(valueSerde); } final ProcessorSupplier, K, Change> aggregateSupplier = new KTableReduce<>( - materializedInternal.storeName(), + materializedInternal, adder, subtractor); return doAggregate(aggregateSupplier, new NamedInternal(named), REDUCE_NAME, materializedInternal); @@ -179,7 +179,7 @@ public class KGroupedTableImpl extends AbstractStream implements KGr } final ProcessorSupplier, K, Change> aggregateSupplier = new KTableAggregate<>( - materializedInternal.storeName(), + materializedInternal, countInitializer, countAdder, countSubtractor); @@ -224,7 +224,7 @@ public class KGroupedTableImpl extends AbstractStream implements KGr materializedInternal.withKeySerde(keySerde); } final ProcessorSupplier, K, Change> aggregateSupplier = new KTableAggregate<>( - materializedInternal.storeName(), + materializedInternal, initializer, adder, subtractor); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index 3e906813d84..abe5fd2b566 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.processor.api.ContextualProcessor; @@ -24,13 +25,20 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.Set; + import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT; @@ -41,19 +49,35 @@ public class KStreamAggregate implements KStreamAggProcessorSupp private static final Logger LOG = LoggerFactory.getLogger(KStreamAggregate.class); private final String storeName; + private final StoreFactory storeFactory; private final Initializer initializer; private final Aggregator aggregator; private boolean sendOldValues = false; - KStreamAggregate(final String storeName, + KStreamAggregate(final MaterializedInternal> materialized, final Initializer initializer, final Aggregator aggregator) { - this.storeName = storeName; + this.storeFactory = new KeyValueStoreMaterializer<>(materialized); + this.storeName = materialized.storeName(); this.initializer = initializer; this.aggregator = aggregator; } + KStreamAggregate(final StoreFactory storeFactory, + final Initializer initializer, + final Aggregator aggregator) { + this.storeFactory = storeFactory; + this.storeName = storeFactory.name(); + this.initializer = initializer; + this.aggregator = aggregator; + } + + @Override + public Set> stores() { + return Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory)); + } + @Override public Processor> get() { return new KStreamAggregateProcessor(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java index 15528f5d150..f337cd9ae44 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java @@ -17,19 +17,27 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.Set; + import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT; @@ -40,15 +48,23 @@ public class KStreamReduce implements KStreamAggProcessorSupplier reducer; private boolean sendOldValues = false; - KStreamReduce(final String storeName, final Reducer reducer) { - this.storeName = storeName; + KStreamReduce(final MaterializedInternal> materialized, final Reducer reducer) { + this.storeFactory = new KeyValueStoreMaterializer<>(materialized); + this.storeName = materialized.storeName(); this.reducer = reducer; } + @Override + public Set> stores() { + return Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory)); + } + + @Override public Processor> get() { return new KStreamReduceProcessor(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index 8f2c53c8a9a..4a8040a8d37 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -33,16 +33,21 @@ import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Set; import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION; import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor; @@ -54,6 +59,7 @@ public class KStreamSessionWindowAggregate implements KStreamAgg private static final Logger LOG = LoggerFactory.getLogger(KStreamSessionWindowAggregate.class); private final String storeName; + private final StoreFactory storeFactory; private final SessionWindows windows; private final Initializer initializer; private final Aggregator aggregator; @@ -63,19 +69,25 @@ public class KStreamSessionWindowAggregate implements KStreamAgg private boolean sendOldValues = false; public KStreamSessionWindowAggregate(final SessionWindows windows, - final String storeName, + final StoreFactory storeFactory, final EmitStrategy emitStrategy, final Initializer initializer, final Aggregator aggregator, final Merger sessionMerger) { this.windows = windows; - this.storeName = storeName; + this.storeName = storeFactory.name(); + this.storeFactory = storeFactory; this.emitStrategy = emitStrategy; this.initializer = initializer; this.aggregator = aggregator; this.sessionMerger = sessionMerger; } + @Override + public Set> stores() { + return Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory)); + } + @Override public Processor, Change> get() { return new KStreamSessionWindowAggregateProcessor(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java index 4a288bb0e83..894657da48c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java @@ -28,7 +28,10 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder; import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowStoreIterator; @@ -36,6 +39,7 @@ import org.apache.kafka.streams.state.WindowStoreIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -46,6 +50,7 @@ public class KStreamSlidingWindowAggregate implements KStreamAgg private static final Logger log = LoggerFactory.getLogger(KStreamSlidingWindowAggregate.class); private final String storeName; + private final StoreFactory storeFactory; private final SlidingWindows windows; private final Initializer initializer; private final Aggregator aggregator; @@ -54,17 +59,23 @@ public class KStreamSlidingWindowAggregate implements KStreamAgg private boolean sendOldValues = false; public KStreamSlidingWindowAggregate(final SlidingWindows windows, - final String storeName, + final StoreFactory storeFactory, final EmitStrategy emitStrategy, final Initializer initializer, final Aggregator aggregator) { this.windows = windows; - this.storeName = storeName; + this.storeName = storeFactory.name(); + this.storeFactory = storeFactory; this.initializer = initializer; this.aggregator = aggregator; this.emitStrategy = emitStrategy; } + @Override + public Set> stores() { + return Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory)); + } + @Override public Processor, Change> get() { return new KStreamSlidingWindowAggregateProcessor(storeName, emitStrategy, sendOldValues); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index 340ce82d856..2e6147627e9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -29,13 +29,18 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.Map; +import java.util.Set; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; @@ -44,6 +49,7 @@ public class KStreamWindowAggregate implements private static final Logger log = LoggerFactory.getLogger(KStreamWindowAggregate.class); private final String storeName; + private final StoreFactory storeFactory; private final Windows windows; private final Initializer initializer; private final Aggregator aggregator; @@ -52,12 +58,13 @@ public class KStreamWindowAggregate implements private boolean sendOldValues = false; public KStreamWindowAggregate(final Windows windows, - final String storeName, + final StoreFactory storeFactory, final EmitStrategy emitStrategy, final Initializer initializer, final Aggregator aggregator) { this.windows = windows; - this.storeName = storeName; + this.storeName = storeFactory.name(); + this.storeFactory = storeFactory; this.emitStrategy = emitStrategy; this.initializer = initializer; this.aggregator = aggregator; @@ -70,6 +77,11 @@ public class KStreamWindowAggregate implements } } + @Override + public Set> stores() { + return Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory)); + } + @Override public Processor, Change> get() { return new KStreamWindowAggregateProcessor(storeName, emitStrategy, sendOldValues); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java index f71143ff209..cecb8048634 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java @@ -16,15 +16,23 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper; +import java.util.Collections; +import java.util.Set; + import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT; import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST; @@ -33,17 +41,19 @@ public class KTableAggregate implements KTableProcessorSupplier { private final String storeName; + private final StoreFactory storeFactory; private final Initializer initializer; private final Aggregator add; private final Aggregator remove; private boolean sendOldValues = false; - KTableAggregate(final String storeName, + KTableAggregate(final MaterializedInternal> materialized, final Initializer initializer, final Aggregator add, final Aggregator remove) { - this.storeName = storeName; + this.storeFactory = new KeyValueStoreMaterializer<>(materialized); + this.storeName = materialized.storeName(); this.initializer = initializer; this.add = add; this.remove = remove; @@ -56,6 +66,11 @@ public class KTableAggregate implements return true; } + @Override + public Set> stores() { + return Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory)); + } + @Override public Processor, KIn, Change> get() { return new KTableAggregateProcessor(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java index c577d30d984..d0b35098abe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java @@ -16,14 +16,22 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper; +import java.util.Collections; +import java.util.Set; + import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT; import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST; @@ -31,13 +39,17 @@ import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_ public class KTableReduce implements KTableProcessorSupplier { private final String storeName; + private final StoreFactory storeFactory; private final Reducer addReducer; private final Reducer removeReducer; private boolean sendOldValues = false; - KTableReduce(final String storeName, final Reducer addReducer, final Reducer removeReducer) { - this.storeName = storeName; + KTableReduce(final MaterializedInternal> materialized, + final Reducer addReducer, + final Reducer removeReducer) { + this.storeFactory = new KeyValueStoreMaterializer<>(materialized); + this.storeName = materialized.storeName(); this.addReducer = addReducer; this.removeReducer = removeReducer; } @@ -49,6 +61,11 @@ public class KTableReduce implements KTableProcessorSupplier { return true; } + @Override + public Set> stores() { + return Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory)); + } + @Override public Processor, K, Change> get() { return new KTableReduceProcessor(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java index d8f3770b79a..989984d42f8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.WindowedSerdes; import org.apache.kafka.streams.kstream.internals.graph.GraphNode; +import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.state.SessionStore; import java.util.Objects; @@ -108,12 +109,14 @@ public class SessionWindowedKStreamImpl extends AbstractStream imple } final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); + final StoreFactory storeFactory = new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy); + return aggregateBuilder.build( new NamedInternal(aggregateName), - new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy), + storeFactory, new KStreamSessionWindowAggregate<>( windows, - materializedInternal.storeName(), + storeFactory, emitStrategy, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, @@ -158,12 +161,14 @@ public class SessionWindowedKStreamImpl extends AbstractStream imple } final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); + final StoreFactory storeFactory = new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy); + return aggregateBuilder.build( new NamedInternal(reduceName), - new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy), + storeFactory, new KStreamSessionWindowAggregate<>( windows, - materializedInternal.storeName(), + storeFactory, emitStrategy, aggregateBuilder.reduceInitializer, reduceAggregator, @@ -216,13 +221,14 @@ public class SessionWindowedKStreamImpl extends AbstractStream imple } final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); + final StoreFactory storeFactory = new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy); return aggregateBuilder.build( new NamedInternal(aggregateName), - new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy), + storeFactory, new KStreamSessionWindowAggregate<>( windows, - materializedInternal.storeName(), + storeFactory, emitStrategy, initializer, aggregator, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java index 3cb7b3f29bd..16b2d0185ae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.SlidingWindows; import org.apache.kafka.streams.kstream.TimeWindowedKStream; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.graph.GraphNode; +import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.state.WindowStore; import java.util.Objects; @@ -90,11 +91,12 @@ public class SlidingWindowedKStreamImpl extends AbstractStream imple } final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); + final StoreFactory storeFactory = new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); return aggregateBuilder.build( new NamedInternal(aggregateName), - new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy), - new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), + storeFactory, + new KStreamSlidingWindowAggregate<>(windows, storeFactory, emitStrategy, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), materializedInternal.queryableStoreName(), materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null, materializedInternal.valueSerde(), @@ -135,11 +137,12 @@ public class SlidingWindowedKStreamImpl extends AbstractStream imple materializedInternal.withKeySerde(keySerde); } final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); + final StoreFactory storeFactory = new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); return aggregateBuilder.build( new NamedInternal(aggregateName), - new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy), - new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, initializer, aggregator), + storeFactory, + new KStreamSlidingWindowAggregate<>(windows, storeFactory, emitStrategy, initializer, aggregator), materializedInternal.queryableStoreName(), materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null, materializedInternal.valueSerde(), @@ -181,11 +184,12 @@ public class SlidingWindowedKStreamImpl extends AbstractStream imple } final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); + final StoreFactory storeFactory = new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); return aggregateBuilder.build( new NamedInternal(reduceName), - new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy), - new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)), + storeFactory, + new KStreamSlidingWindowAggregate<>(windows, storeFactory, emitStrategy, aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)), materializedInternal.queryableStoreName(), materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null, materializedInternal.valueSerde(), diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java index b615e20714b..80a671abdc5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.kstream.internals.graph.GraphNode; +import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.state.WindowStore; import java.util.Objects; @@ -102,13 +103,14 @@ public class TimeWindowedKStreamImpl extends AbstractStr } final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); + final StoreFactory storeFactory = new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); return aggregateBuilder.build( new NamedInternal(aggregateName), - new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy), + storeFactory, new KStreamWindowAggregate<>( windows, - materializedInternal.storeName(), + storeFactory, emitStrategy, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), @@ -154,13 +156,14 @@ public class TimeWindowedKStreamImpl extends AbstractStr } final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); + final StoreFactory storeFactory = new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); return aggregateBuilder.build( new NamedInternal(aggregateName), - new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy), + storeFactory, new KStreamWindowAggregate<>( windows, - materializedInternal.storeName(), + storeFactory, emitStrategy, initializer, aggregator), @@ -205,13 +208,14 @@ public class TimeWindowedKStreamImpl extends AbstractStr } final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); + final StoreFactory storeFactory = new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); return aggregateBuilder.build( new NamedInternal(reduceName), - new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy), + storeFactory, new KStreamWindowAggregate<>( windows, - materializedInternal.storeName(), + storeFactory, emitStrategy, aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)), diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index 5210dd0b3c6..36559b18e29 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -35,7 +36,9 @@ import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Printed; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.SessionWindows; +import org.apache.kafka.streams.kstream.SlidingWindows; import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -47,6 +50,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; @@ -56,6 +60,7 @@ import org.apache.kafka.streams.state.internals.RocksDBStore; import org.apache.kafka.streams.state.internals.RocksDBWindowStore; import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper; +import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper.WrapperRecorder; import org.apache.kafka.test.MockApiProcessorSupplier; import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.MockPredicate; @@ -76,7 +81,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -88,6 +92,8 @@ import static java.util.Arrays.asList; import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1; +import static org.apache.kafka.streams.state.Stores.inMemoryKeyValueStore; +import static org.apache.kafka.streams.state.Stores.timestampedKeyValueStoreBuilder; import static org.apache.kafka.streams.utils.TestUtils.PROCESSOR_WRAPPER_COUNTER_CONFIG; import static org.apache.kafka.streams.utils.TestUtils.dummyStreamsConfigMap; import static org.hamcrest.CoreMatchers.equalTo; @@ -124,7 +130,7 @@ public class StreamsBuilderTest { final StreamsBuilder builder = new StreamsBuilder(); builder.addGlobalStore( Stores.keyValueStoreBuilder( - Stores.inMemoryKeyValueStore("store"), + inMemoryKeyValueStore("store"), Serdes.String(), Serdes.String() ), @@ -1384,7 +1390,7 @@ public class StreamsBuilderTest { @Test public void shouldUseSpecifiedNameForGlobalStoreProcessor() { builder.addGlobalStore(Stores.keyValueStoreBuilder( - Stores.inMemoryKeyValueStore("store"), + inMemoryKeyValueStore("store"), Serdes.String(), Serdes.String() ), @@ -1401,7 +1407,7 @@ public class StreamsBuilderTest { @Test public void shouldUseDefaultNameForGlobalStoreProcessor() { builder.addGlobalStore(Stores.keyValueStoreBuilder( - Stores.inMemoryKeyValueStore("store"), + inMemoryKeyValueStore("store"), Serdes.String(), Serdes.String() ), @@ -1420,8 +1426,8 @@ public class StreamsBuilderTest { final Map props = dummyStreamsConfigMap(); props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); - final Set wrappedProcessors = Collections.synchronizedSet(new HashSet<>()); - props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors); + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); @@ -1430,35 +1436,247 @@ public class StreamsBuilderTest { // call to fail final Random random = new Random(); - builder.stream("input") - .process((ProcessorSupplier) () -> record -> System.out.println("Processing: " + random.nextInt()), Named.as("processor1")) - .processValues(() -> record -> System.out.println("Processing: " + random.nextInt()), Named.as("processor2")) - .to("output"); + final StoreBuilder store = timestampedKeyValueStoreBuilder(inMemoryKeyValueStore("store"), Serdes.String(), Serdes.String()); + builder.stream("input", Consumed.as("source")) + .process( + new ProcessorSupplier<>() { + @Override + public Processor get() { + return record -> System.out.println("Processing: " + random.nextInt()); + } + + @Override + public Set> stores() { + return Collections.singleton(store); + } + }, + Named.as("stateful-process-1")) + .process( + new ProcessorSupplier<>() { + @Override + public Processor get() { + return record -> System.out.println("Processing: " + random.nextInt()); + } + + @Override + public Set> stores() { + return Collections.singleton(store); + } + }, + Named.as("stateful-process-2")) + .processValues( + () -> record -> System.out.println("Processing values: " + random.nextInt()), + Named.as("stateless-processValues")) + .to("output", Produced.as("sink")); builder.build(); - assertThat(wrappedProcessors.size(), CoreMatchers.is(2)); - assertThat(wrappedProcessors, Matchers.containsInAnyOrder("processor1", "processor2")); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(3)); + assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder( + "stateful-process-1", "stateful-process-2", "stateless-processValues")); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2)); } @Test - public void shouldWrapProcessorsForAggregationOperators() { + public void shouldWrapProcessorsForStreamReduce() { final Map props = dummyStreamsConfigMap(); props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); - final Set wrappedProcessors = Collections.synchronizedSet(new HashSet<>()); - props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors); + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); - builder.stream("input") + builder.stream("input", Consumed.as("source")) + .groupBy(KeyValue::new, Grouped.as("groupBy")) // wrapped 1 & 2 (implicit selectKey & repartition) + .reduce((l, r) -> l, Named.as("reduce"), Materialized.as("store")) // wrapped 3 + .toStream(Named.as("toStream"))// wrapped 4 + .to("output", Produced.as("sink")); + + builder.build(); + assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder( + "groupBy", "groupBy-repartition-filter", "reduce", "toStream")); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(4)); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1)); + } + + @Test + public void shouldWrapProcessorsForStreamAggregate() { + final Map props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + builder.stream("input", Consumed.as("source")) .groupByKey() .count(Named.as("count")) // wrapped 1 .toStream(Named.as("toStream"))// wrapped 2 - .to("output"); + .to("output", Produced.as("sink")); builder.build(); - assertThat(wrappedProcessors.size(), CoreMatchers.is(2)); - assertThat(wrappedProcessors, Matchers.containsInAnyOrder("count", "toStream")); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2)); + assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder("count", "toStream")); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1)); + } + + @Test + public void shouldWrapProcessorsForTimeWindowStreamAggregate() { + final Map props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + builder.stream("input", Consumed.as("source")) + .groupByKey() + .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(1))) + .count(Named.as("count")) // wrapped 1 + .toStream(Named.as("toStream"))// wrapped 2 + .to("output", Produced.as("sink")); + + builder.build(); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2)); + assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder("count", "toStream")); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1)); + } + + @Test + public void shouldWrapProcessorsForSlidingWindowStreamAggregate() { + final Map props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + builder.stream("input", Consumed.as("source")) + .groupByKey() + .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofDays(1))) + .count(Named.as("count")) // wrapped 1 + .toStream(Named.as("toStream"))// wrapped 2 + .to("output", Produced.as("sink")); + + builder.build(); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2)); + assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder("count", "toStream")); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1)); + } + + @Test + public void shouldWrapProcessorsForSessionWindowStreamAggregate() { + final Map props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + builder.stream("input", Consumed.as("source")) + .groupByKey() + .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofDays(1))) + .count(Named.as("count")) // wrapped 1 + .toStream(Named.as("toStream"))// wrapped 2 + .to("output", Produced.as("sink")); + + builder.build(); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2)); + assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder("count", "toStream")); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1)); + } + + @Test + public void shouldWrapProcessorsForCoGroupedStreamAggregate() { + final Map props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + final KStream stream1 = builder.stream("one", Consumed.as("source-1")); + final KStream stream2 = builder.stream("two", Consumed.as("source-2")); + + final KGroupedStream grouped1 = stream1.groupByKey(Grouped.as("groupByKey-1")); + final KGroupedStream grouped2 = stream2.groupByKey(Grouped.as("groupByKey-2")); + + grouped1 + .cogroup((k, v, a) -> a + v) // wrapped 1 + .cogroup(grouped2, (k, v, a) -> a + v) // wrapped 2 + .aggregate(() -> "", Named.as("aggregate"), Materialized.as("store")) // wrapped 3, store 1 + .toStream(Named.as("toStream"))// wrapped 4 + .to("output", Produced.as("sink")); + + final var top = builder.build(); + System.out.println(top.describe()); + assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder( + "aggregate-cogroup-agg-0", "aggregate-cogroup-agg-1", "aggregate-cogroup-merge", "toStream" + )); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(4)); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2)); + } + + @Test + public void shouldWrapProcessorsForTableAggregate() { + final Map props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + builder.table("input", Consumed.as("source-table")) // wrapped 1, store 1 + .groupBy(KeyValue::new, Grouped.as("groupBy")) // wrapped 2 (implicit selectKey) + .count(Named.as("count")) // wrapped 3, store 2 + .toStream(Named.as("toStream"))// wrapped 4 + .to("output", Produced.as("sink")); + + builder.build(); + assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder( + "source-table", "groupBy", "count", "toStream" + )); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(4)); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2)); + } + + @Test + public void shouldWrapProcessorsForTableReduce() { + final Map props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + builder.table("input", Consumed.as("source-table")) // wrapped 1, store 1 + .groupBy(KeyValue::new, Grouped.as("groupBy")) // wrapped 2 (implicit selectKey) + .reduce((l, r) -> "", (l, r) -> "", Named.as("reduce"), Materialized.as("store")) // wrapped 3, store 2 + .toStream(Named.as("toStream"))// wrapped 4 + .to("output", Produced.as("sink")); + + builder.build(); + assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder( + "source-table", "groupBy", "reduce", "toStream" + )); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(4)); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2)); } @Test @@ -1466,49 +1684,76 @@ public class StreamsBuilderTest { final Map props = dummyStreamsConfigMap(); props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); - final Set wrappedProcessors = Collections.synchronizedSet(new HashSet<>()); - props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors); + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); - builder.stream("input") - .filter((k, v) -> true, Named.as("filter")) // wrapped 1 + builder.stream("input", Consumed.as("source")) + .filter((k, v) -> true, Named.as("filter-stream")) // wrapped 1 .map(KeyValue::new, Named.as("map")) // wrapped 2 .selectKey((k, v) -> k, Named.as("selectKey")) // wrapped 3 .peek((k, v) -> { }, Named.as("peek")) // wrapped 4 .flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) // wrapped 5 - .toTable(Named.as("toTable")) // wrapped 6 (note named as toTable-repartition-filter) + .toTable(Named.as("toTable")) // should be wrapped when we do StreamToTableNode + .filter((k, v) -> true, Named.as("filter-table")) // should be wrapped once we do TableProcessorNode .toStream(Named.as("toStream")) // wrapped 7 - .to("output"); + .to("output", Produced.as("sink")); builder.build(); - assertThat(wrappedProcessors.size(), CoreMatchers.is(7)); - assertThat(wrappedProcessors, Matchers.containsInAnyOrder( - "filter", "map", "selectKey", "peek", "flatMap", "toTable-repartition-filter", - "toStream" + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(7)); + assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder( + "filter-stream", "map", "selectKey", "peek", "flatMap", + "toTable-repartition-filter", "toStream" )); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(0)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(0)); } @Test - public void shouldWrapProcessorsForTableSource() { + public void shouldWrapProcessorsForUnmaterializedSourceTable() { final Map props = dummyStreamsConfigMap(); props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); - final Set wrappedProcessors = Collections.synchronizedSet(new HashSet<>()); - props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors); + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); - builder.table("input") // wrapped 1 (named KTABLE_SOURCE-0000000002) - .toStream(Named.as("toStream")) // wrapped 2 - .to("output"); + builder.table("input", Consumed.as("source")) // wrapped 1 + .toStream(Named.as("toStream")) // wrapped 2 + .to("output", Produced.as("sink")); builder.build(); - assertThat(wrappedProcessors.size(), CoreMatchers.is(2)); - assertThat(wrappedProcessors, Matchers.containsInAnyOrder( - "KTABLE-SOURCE-0000000002", - "toStream" + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2)); + assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder( + "source", "toStream" )); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(0)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(0)); + } + + @Test + public void shouldWrapProcessorsForMaterializedSourceTable() { + final Map props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + builder.table("input", Consumed.as("source"), Materialized.as("store")) // wrapped 1 + .toStream(Named.as("toStream")) // wrapped 2 + .to("output", Produced.as("sink")); + + builder.build(); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2)); + assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder( + "source", "toStream" + )); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1)); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index 833eada8d5e..f43747d3cf2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -46,6 +46,7 @@ import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper; +import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper.WrapperRecorder; import org.apache.kafka.test.MockApiProcessorSupplier; import org.apache.kafka.test.MockKeyValueStore; import org.apache.kafka.test.MockProcessorSupplier; @@ -2427,8 +2428,8 @@ public class TopologyTest { final Map props = dummyStreamsConfigMap(); props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); - final Set wrappedProcessors = Collections.synchronizedSet(new HashSet<>()); - props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors); + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); final Topology topology = new Topology(new TopologyConfig(new StreamsConfig(props))); @@ -2453,8 +2454,8 @@ public class TopologyTest { () -> (Processor) record -> System.out.println("Processing: " + random.nextInt()), "p2" ); - assertThat(wrappedProcessors.size(), is(3)); - assertThat(wrappedProcessors, Matchers.containsInAnyOrder("p1", "p2", "p3")); + assertThat(counter.numWrappedProcessors(), is(3)); + assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder("p1", "p2", "p3")); } @SuppressWarnings("deprecation") diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index adf7b32c708..f9503fd1564 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -64,6 +64,7 @@ import java.util.stream.Collectors; import static java.time.Duration.ofMillis; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.utils.TestUtils.mockStoreFactory; import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.is; @@ -126,7 +127,7 @@ public class KStreamSessionWindowAggregateProcessorTest { sessionAggregator = new KStreamSessionWindowAggregate<>( SessionWindows.ofInactivityGapWithNoGrace(ofMillis(GAP_MS)), - STORE_NAME, + mockStoreFactory(STORE_NAME), emitStrategy, initializer, aggregator, @@ -484,7 +485,7 @@ public class KStreamSessionWindowAggregateProcessorTest { setup(inputType, false); final Processor, Change> processor = new KStreamSessionWindowAggregate<>( SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(0L)), - STORE_NAME, + mockStoreFactory(STORE_NAME), EmitStrategy.onWindowUpdate(), initializer, aggregator, @@ -551,7 +552,7 @@ public class KStreamSessionWindowAggregateProcessorTest { setup(inputType, false); final Processor, Change> processor = new KStreamSessionWindowAggregate<>( SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1L)), - STORE_NAME, + mockStoreFactory(STORE_NAME), EmitStrategy.onWindowUpdate(), initializer, aggregator, diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index 14b77a7cc30..c4fdf3ad86a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -46,6 +46,7 @@ import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForwa import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; @@ -76,6 +77,7 @@ import static java.time.Duration.ofMillis; import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.utils.TestUtils.mockStoreFactory; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.hasItems; @@ -90,6 +92,7 @@ public class KStreamWindowAggregateTest { private static final String WINDOW_STORE_NAME = "dummy-store-name"; private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final String threadId = Thread.currentThread().getName(); + private final StoreFactory storeFactory = mockStoreFactory(WINDOW_STORE_NAME); public StrategyType type; @@ -646,7 +649,7 @@ public class KStreamWindowAggregateTest { final MockInternalNewProcessorContext, Change> context = makeContext(stateDir, windowSize); final KStreamWindowAggregate processorSupplier = new KStreamWindowAggregate<>( windows, - WINDOW_STORE_NAME, + storeFactory, emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER @@ -736,7 +739,7 @@ public class KStreamWindowAggregateTest { final MockInternalNewProcessorContext, Change> context = makeContext(stateDir, windowSize); final KStreamWindowAggregate processorSupplier = new KStreamWindowAggregate<>( windows, - WINDOW_STORE_NAME, + storeFactory, emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER @@ -805,7 +808,7 @@ public class KStreamWindowAggregateTest { final MockInternalNewProcessorContext, Change> context = makeContext(stateDir, windowSize); final KStreamWindowAggregate processorSupplier = new KStreamWindowAggregate<>( windows, - WINDOW_STORE_NAME, + storeFactory, emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER @@ -906,7 +909,7 @@ public class KStreamWindowAggregateTest { final MockInternalNewProcessorContext, Change> context = makeContext(stateDir, windowSize); final KStreamWindowAggregate processorSupplier = new KStreamWindowAggregate<>( windows, - WINDOW_STORE_NAME, + storeFactory, emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER @@ -982,7 +985,7 @@ public class KStreamWindowAggregateTest { final IllegalArgumentException e = assertThrows( IllegalArgumentException.class, () -> new KStreamWindowAggregate<>( UnlimitedWindows.of(), - WINDOW_STORE_NAME, + storeFactory, emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER) @@ -992,7 +995,7 @@ public class KStreamWindowAggregateTest { } else { new KStreamWindowAggregate<>( UnlimitedWindows.of(), - WINDOW_STORE_NAME, + storeFactory, emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java index de437ed66a5..5f1c8489dab 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.ProcessorNode; @@ -42,7 +43,7 @@ public class KTableReduceTest { final Processor>, String, Change>> reduceProcessor = new KTableReduce>( - "myStore", + new MaterializedInternal<>(Materialized.as("myStore")), this::unionNotNullArgs, this::differenceNotNullArgs ).get(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java index 1541dce30ba..8552790692c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.internals.StoreFactory; import org.junit.jupiter.api.Test; import static java.time.Duration.ofMillis; +import static org.apache.kafka.streams.utils.TestUtils.mockStoreFactory; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.fail; @@ -82,7 +83,7 @@ public class GraphGraceSearchUtilTest { new ProcessorParameters<>( new KStreamWindowAggregate( windows, - "asdf", + mockStoreFactory("asdf"), EmitStrategy.onWindowUpdate(), null, null @@ -105,7 +106,7 @@ public class GraphGraceSearchUtilTest { new ProcessorParameters<>( new KStreamSessionWindowAggregate( windows, - "asdf", + mockStoreFactory("asdf"), EmitStrategy.onWindowUpdate(), null, null, @@ -126,7 +127,7 @@ public class GraphGraceSearchUtilTest { final StatefulProcessorNode graceGrandparent = new StatefulProcessorNode<>( "asdf", new ProcessorParameters<>(new KStreamSessionWindowAggregate( - windows, "asdf", EmitStrategy.onWindowUpdate(), null, null, null + windows, mockStoreFactory("asdf"), EmitStrategy.onWindowUpdate(), null, null, null ), "asdf"), (StoreFactory) null ); @@ -161,7 +162,7 @@ public class GraphGraceSearchUtilTest { new ProcessorParameters<>( new KStreamSessionWindowAggregate( windows, - "asdf", + mockStoreFactory("asdf"), EmitStrategy.onWindowUpdate(), null, null, @@ -189,7 +190,7 @@ public class GraphGraceSearchUtilTest { new ProcessorParameters<>( new KStreamSessionWindowAggregate( SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)), - "asdf", + mockStoreFactory("asdf"), EmitStrategy.onWindowUpdate(), null, null, @@ -205,7 +206,7 @@ public class GraphGraceSearchUtilTest { new ProcessorParameters<>( new KStreamWindowAggregate( TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(4321L)), - "asdf", + mockStoreFactory("asdf"), EmitStrategy.onWindowUpdate(), null, null diff --git a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java index 24ac2f2306d..6ecc6b7c7ac 100644 --- a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java @@ -18,13 +18,19 @@ package org.apache.kafka.streams.utils; import org.apache.kafka.common.Uuid; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; +import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.ProcessorWrapper; import org.apache.kafka.streams.processor.api.WrappedFixedKeyProcessorSupplier; import org.apache.kafka.streams.processor.api.WrappedProcessorSupplier; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper.WrapperRecorder; import org.junit.jupiter.api.TestInfo; +import org.mockito.Mockito; import java.lang.reflect.Method; import java.time.Duration; @@ -43,7 +49,7 @@ import static org.hamcrest.MatcherAssert.assertThat; public class TestUtils { - public static final String PROCESSOR_WRAPPER_COUNTER_CONFIG = "wrapped.processor.count"; + public static final String PROCESSOR_WRAPPER_COUNTER_CONFIG = "wrapped.counter"; /** * Waits for the given {@link KafkaStreams} instances to all be in a specific {@link KafkaStreams.State}. @@ -111,6 +117,12 @@ public class TestUtils { return baseConfigs; } + public static StoreFactory mockStoreFactory(final String name) { + final StoreFactory storeFactory = Mockito.mock(StoreFactory.class); + Mockito.when(storeFactory.name()).thenReturn(name); + return storeFactory; + } + /** * Simple pass-through processor wrapper that counts the number of processors * it wraps. @@ -119,29 +131,142 @@ public class TestUtils { */ public static class RecordingProcessorWrapper implements ProcessorWrapper { - private Set wrappedProcessorNames; + private WrapperRecorder recorder; @Override public void configure(final Map configs) { if (configs.containsKey(PROCESSOR_WRAPPER_COUNTER_CONFIG)) { - wrappedProcessorNames = (Set) configs.get(PROCESSOR_WRAPPER_COUNTER_CONFIG); + recorder = (WrapperRecorder) configs.get(PROCESSOR_WRAPPER_COUNTER_CONFIG); } else { - wrappedProcessorNames = Collections.synchronizedSet(new HashSet<>()); + recorder = new WrapperRecorder(); } } + public static class WrapperRecorder { + private final Set uniqueStores = new HashSet<>(); + private final Set processorStoresCounted = new HashSet<>(); + private final Set wrappedProcessorNames = Collections.synchronizedSet(new HashSet<>()); + + public void wrapProcessorSupplier(final String name) { + wrappedProcessorNames.add(name); + } + + public void wrapStateStore(final String processorName, final String storeName) { + if (!uniqueStores.contains(storeName)) { + uniqueStores.add(storeName); + } + + final String processorStoreKey = processorName + storeName; + if (!processorStoresCounted.contains(processorStoreKey)) { + processorStoresCounted.add(processorStoreKey); + } + } + + public int numWrappedProcessors() { + return wrappedProcessorNames.size(); + } + + // Number of unique state stores in the topology connected to their processors via the + // ProcessorSupplier#stores method. State stores connected to more than one processor are + // counted only once + public int numUniqueStateStores() { + return uniqueStores.size(); + } + + // Number of stores connected to a processor via the ProcessorSupplier#stores method (ie the size + // of the set returned by #stores), summed across all processors in the topology. + // Equal to the number of unique - + // pairings. Will be greater than or equal to the value of #numUniqueStateStores, as this method + // will double count any stores connected to more than one processor + public int numConnectedStateStores() { + return processorStoresCounted.size(); + } + + public Set wrappedProcessorNames() { + return wrappedProcessorNames; + } + + } + @Override public WrappedProcessorSupplier wrapProcessorSupplier(final String processorName, final ProcessorSupplier processorSupplier) { - wrappedProcessorNames.add(processorName); - return ProcessorWrapper.asWrapped(processorSupplier); + + return new CountingDelegatingProcessorSupplier<>(recorder, processorName, processorSupplier); } @Override public WrappedFixedKeyProcessorSupplier wrapFixedKeyProcessorSupplier(final String processorName, final FixedKeyProcessorSupplier processorSupplier) { - wrappedProcessorNames.add(processorName); - return ProcessorWrapper.asWrappedFixedKey(processorSupplier); + return new CountingDelegatingFixedKeyProcessorSupplier<>(recorder, processorName, processorSupplier); + } + } + + private static class CountingDelegatingProcessorSupplier + implements WrappedProcessorSupplier { + + private final WrapperRecorder counter; + private final String processorName; + private final ProcessorSupplier delegate; + + public CountingDelegatingProcessorSupplier(final WrapperRecorder counter, + final String processorName, + final ProcessorSupplier processorSupplier) { + this.counter = counter; + this.processorName = processorName; + this.delegate = processorSupplier; + + counter.wrapProcessorSupplier(processorName); + } + + @Override + public Set> stores() { + final Set> stores = delegate.stores(); + if (stores != null) { + for (final StoreBuilder store : stores) { + counter.wrapStateStore(processorName, store.name()); + } + } + return stores; + } + + @Override + public Processor get() { + return delegate.get(); + } + } + + private static class CountingDelegatingFixedKeyProcessorSupplier + implements WrappedFixedKeyProcessorSupplier { + + private final WrapperRecorder counter; + private final String processorName; + private final FixedKeyProcessorSupplier delegate; + + public CountingDelegatingFixedKeyProcessorSupplier(final WrapperRecorder counter, + final String processorName, + final FixedKeyProcessorSupplier processorSupplier) { + this.counter = counter; + this.processorName = processorName; + this.delegate = processorSupplier; + + counter.wrapProcessorSupplier(processorName); + } + + @Override + public Set> stores() { + final Set> stores = delegate.stores(); + if (stores != null) { + for (final StoreBuilder store : stores) { + counter.wrapStateStore(processorName, store.name()); + } + } + return stores; + } + + @Override + public FixedKeyProcessor get() { + return delegate.get(); } } }