KAFKA-18026: migrate KStream and KTable aggregates to use ProcesserSupplier#stores (#17929)

As part of KIP-1112, to maximize the utility of the new ProcessorWrapper, we need to migrate the DSL operators to the new method of attaching state stores by implementing ProcessorSupplier#stores, which makes these stores available for inspection by the user's wrapper.

This PR covers the aggregate operator for both KStream and KTable.


Reviewers: Guozhang Wang <guozhang.wang.us@gmail.com>, Rohan Desai <rohan@responsive.dev>
This commit is contained in:
A. Sophie Blee-Goldman 2024-12-03 02:09:43 -08:00 committed by GitHub
parent f87c04854b
commit 184b64fb41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 614 additions and 116 deletions

View File

@ -58,14 +58,14 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde, final Serde<VOut> valueSerde,
final String queryableName, final String queryableName,
final boolean isOutputVersioned) { final boolean isOutputVersioned) {
processRepartitions(groupPatterns, storeFactory); processRepartitions(groupPatterns, storeFactory.name());
final Collection<GraphNode> processors = new ArrayList<>(); final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>(); final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
boolean stateCreated = false; boolean stateCreated = false;
int counter = 0; int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) { for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor = final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
new KStreamAggregate<>(storeFactory.name(), initializer, kGroupedStream.getValue()); new KStreamAggregate<>(storeFactory, initializer, kGroupedStream.getValue());
parentProcessors.add(parentProcessor); parentProcessors.add(parentProcessor);
final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode( final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
named.suffixWithOrElseGet( named.suffixWithOrElseGet(
@ -92,7 +92,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde, final Serde<VOut> valueSerde,
final String queryableName, final String queryableName,
final Windows<W> windows) { final Windows<W> windows) {
processRepartitions(groupPatterns, storeFactory); processRepartitions(groupPatterns, storeFactory.name());
final Collection<GraphNode> processors = new ArrayList<>(); final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>(); final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
@ -102,7 +102,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor = final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
(KStreamAggProcessorSupplier<K, ?, K, ?>) new KStreamWindowAggregate<K, K, VOut, W>( (KStreamAggProcessorSupplier<K, ?, K, ?>) new KStreamWindowAggregate<K, K, VOut, W>(
windows, windows,
storeFactory.name(), storeFactory,
EmitStrategy.onWindowUpdate(), EmitStrategy.onWindowUpdate(),
initializer, initializer,
kGroupedStream.getValue()); kGroupedStream.getValue());
@ -132,7 +132,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final String queryableName, final String queryableName,
final SessionWindows sessionWindows, final SessionWindows sessionWindows,
final Merger<? super K, VOut> sessionMerger) { final Merger<? super K, VOut> sessionMerger) {
processRepartitions(groupPatterns, storeFactory); processRepartitions(groupPatterns, storeFactory.name());
final Collection<GraphNode> processors = new ArrayList<>(); final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>(); final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
boolean stateCreated = false; boolean stateCreated = false;
@ -141,7 +141,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor = final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
(KStreamAggProcessorSupplier<K, ?, K, ?>) new KStreamSessionWindowAggregate<K, K, VOut>( (KStreamAggProcessorSupplier<K, ?, K, ?>) new KStreamSessionWindowAggregate<K, K, VOut>(
sessionWindows, sessionWindows,
storeFactory.name(), storeFactory,
EmitStrategy.onWindowUpdate(), EmitStrategy.onWindowUpdate(),
initializer, initializer,
kGroupedStream.getValue(), kGroupedStream.getValue(),
@ -171,7 +171,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde, final Serde<VOut> valueSerde,
final String queryableName, final String queryableName,
final SlidingWindows slidingWindows) { final SlidingWindows slidingWindows) {
processRepartitions(groupPatterns, storeFactory); processRepartitions(groupPatterns, storeFactory.name());
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>(); final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
final Collection<GraphNode> processors = new ArrayList<>(); final Collection<GraphNode> processors = new ArrayList<>();
boolean stateCreated = false; boolean stateCreated = false;
@ -180,7 +180,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor = final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
(KStreamAggProcessorSupplier<K, ?, K, ?>) new KStreamSlidingWindowAggregate<K, K, VOut>( (KStreamAggProcessorSupplier<K, ?, K, ?>) new KStreamSlidingWindowAggregate<K, K, VOut>(
slidingWindows, slidingWindows,
storeFactory.name(), storeFactory,
// TODO: We do not have other emit policies for co-group yet // TODO: We do not have other emit policies for co-group yet
EmitStrategy.onWindowUpdate(), EmitStrategy.onWindowUpdate(),
initializer, initializer,
@ -202,7 +202,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
} }
private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns, private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final StoreFactory storeFactory) { final String storeName) {
for (final KGroupedStreamImpl<K, ?> repartitionReqs : groupPatterns.keySet()) { for (final KGroupedStreamImpl<K, ?> repartitionReqs : groupPatterns.keySet()) {
if (repartitionReqs.repartitionRequired) { if (repartitionReqs.repartitionRequired) {
@ -210,7 +210,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final OptimizableRepartitionNodeBuilder<K, ?> repartitionNodeBuilder = optimizableRepartitionNodeBuilder(); final OptimizableRepartitionNodeBuilder<K, ?> repartitionNodeBuilder = optimizableRepartitionNodeBuilder();
final String repartitionNamePrefix = repartitionReqs.userProvidedRepartitionTopicName != null ? final String repartitionNamePrefix = repartitionReqs.userProvidedRepartitionTopicName != null ?
repartitionReqs.userProvidedRepartitionTopicName : storeFactory.name(); repartitionReqs.userProvidedRepartitionTopicName : storeName;
createRepartitionSource(repartitionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde); createRepartitionSource(repartitionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde);

View File

@ -98,7 +98,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
return doAggregate( return doAggregate(
new KStreamReduce<>(materializedInternal.storeName(), reducer), new KStreamReduce<>(materializedInternal, reducer),
name, name,
materializedInternal materializedInternal
); );
@ -130,7 +130,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
return doAggregate( return doAggregate(
new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator), new KStreamAggregate<>(materializedInternal, initializer, aggregator),
name, name,
materializedInternal materializedInternal
); );
@ -184,7 +184,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
return doAggregate( return doAggregate(
new KStreamAggregate<>(materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), new KStreamAggregate<>(materializedInternal, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
name, name,
materializedInternal); materializedInternal);
} }

View File

@ -91,7 +91,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>( final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>(
funcName, funcName,
new ProcessorParameters<>(aggregateSupplier, funcName), new ProcessorParameters<>(aggregateSupplier, funcName),
new KeyValueStoreMaterializer<>(materialized) new String[]{materialized.storeName()}
); );
statefulProcessorNode.setOutputVersioned(materialized.storeSupplier() instanceof VersionedBytesStoreSupplier); statefulProcessorNode.setOutputVersioned(materialized.storeSupplier() instanceof VersionedBytesStoreSupplier);
@ -148,7 +148,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
materializedInternal.withValueSerde(valueSerde); materializedInternal.withValueSerde(valueSerde);
} }
final ProcessorSupplier<K, Change<V>, K, Change<V>> aggregateSupplier = new KTableReduce<>( final ProcessorSupplier<K, Change<V>, K, Change<V>> aggregateSupplier = new KTableReduce<>(
materializedInternal.storeName(), materializedInternal,
adder, adder,
subtractor); subtractor);
return doAggregate(aggregateSupplier, new NamedInternal(named), REDUCE_NAME, materializedInternal); return doAggregate(aggregateSupplier, new NamedInternal(named), REDUCE_NAME, materializedInternal);
@ -179,7 +179,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
} }
final ProcessorSupplier<K, Change<V>, K, Change<Long>> aggregateSupplier = new KTableAggregate<>( final ProcessorSupplier<K, Change<V>, K, Change<Long>> aggregateSupplier = new KTableAggregate<>(
materializedInternal.storeName(), materializedInternal,
countInitializer, countInitializer,
countAdder, countAdder,
countSubtractor); countSubtractor);
@ -224,7 +224,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
materializedInternal.withKeySerde(keySerde); materializedInternal.withKeySerde(keySerde);
} }
final ProcessorSupplier<K, Change<V>, K, Change<VAgg>> aggregateSupplier = new KTableAggregate<>( final ProcessorSupplier<K, Change<V>, K, Change<VAgg>> aggregateSupplier = new KTableAggregate<>(
materializedInternal.storeName(), materializedInternal,
initializer, initializer,
adder, adder,
subtractor); subtractor);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals; package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.ContextualProcessor;
@ -24,13 +25,20 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper; import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Set;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT; import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
@ -41,19 +49,35 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements KStreamAggProcessorSupp
private static final Logger LOG = LoggerFactory.getLogger(KStreamAggregate.class); private static final Logger LOG = LoggerFactory.getLogger(KStreamAggregate.class);
private final String storeName; private final String storeName;
private final StoreFactory storeFactory;
private final Initializer<VAgg> initializer; private final Initializer<VAgg> initializer;
private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator; private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
private boolean sendOldValues = false; private boolean sendOldValues = false;
KStreamAggregate(final String storeName, KStreamAggregate(final MaterializedInternal<KIn, VAgg, KeyValueStore<Bytes, byte[]>> materialized,
final Initializer<VAgg> initializer, final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) { final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) {
this.storeName = storeName; this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
this.storeName = materialized.storeName();
this.initializer = initializer; this.initializer = initializer;
this.aggregator = aggregator; this.aggregator = aggregator;
} }
KStreamAggregate(final StoreFactory storeFactory,
final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) {
this.storeFactory = storeFactory;
this.storeName = storeFactory.name();
this.initializer = initializer;
this.aggregator = aggregator;
}
@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory));
}
@Override @Override
public Processor<KIn, VIn, KIn, Change<VAgg>> get() { public Processor<KIn, VIn, KIn, Change<VAgg>> get() {
return new KStreamAggregateProcessor(); return new KStreamAggregateProcessor();

View File

@ -17,19 +17,27 @@
package org.apache.kafka.streams.kstream.internals; package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper; import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Set;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT; import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
@ -40,15 +48,23 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, V, K,
private static final Logger LOG = LoggerFactory.getLogger(KStreamReduce.class); private static final Logger LOG = LoggerFactory.getLogger(KStreamReduce.class);
private final String storeName; private final String storeName;
private final StoreFactory storeFactory;
private final Reducer<V> reducer; private final Reducer<V> reducer;
private boolean sendOldValues = false; private boolean sendOldValues = false;
KStreamReduce(final String storeName, final Reducer<V> reducer) { KStreamReduce(final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized, final Reducer<V> reducer) {
this.storeName = storeName; this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
this.storeName = materialized.storeName();
this.reducer = reducer; this.reducer = reducer;
} }
@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory));
}
@Override @Override
public Processor<K, V, K, Change<V>> get() { public Processor<K, V, K, Change<V>> get() {
return new KStreamReduceProcessor(); return new KStreamReduceProcessor();

View File

@ -33,16 +33,21 @@ import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set;
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION; import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor; import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
@ -54,6 +59,7 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
private static final Logger LOG = LoggerFactory.getLogger(KStreamSessionWindowAggregate.class); private static final Logger LOG = LoggerFactory.getLogger(KStreamSessionWindowAggregate.class);
private final String storeName; private final String storeName;
private final StoreFactory storeFactory;
private final SessionWindows windows; private final SessionWindows windows;
private final Initializer<VAgg> initializer; private final Initializer<VAgg> initializer;
private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator; private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
@ -63,19 +69,25 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
private boolean sendOldValues = false; private boolean sendOldValues = false;
public KStreamSessionWindowAggregate(final SessionWindows windows, public KStreamSessionWindowAggregate(final SessionWindows windows,
final String storeName, final StoreFactory storeFactory,
final EmitStrategy emitStrategy, final EmitStrategy emitStrategy,
final Initializer<VAgg> initializer, final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super VIn, VAgg> aggregator, final Aggregator<? super KIn, ? super VIn, VAgg> aggregator,
final Merger<? super KIn, VAgg> sessionMerger) { final Merger<? super KIn, VAgg> sessionMerger) {
this.windows = windows; this.windows = windows;
this.storeName = storeName; this.storeName = storeFactory.name();
this.storeFactory = storeFactory;
this.emitStrategy = emitStrategy; this.emitStrategy = emitStrategy;
this.initializer = initializer; this.initializer = initializer;
this.aggregator = aggregator; this.aggregator = aggregator;
this.sessionMerger = sessionMerger; this.sessionMerger = sessionMerger;
} }
@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory));
}
@Override @Override
public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() { public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() {
return new KStreamSessionWindowAggregateProcessor(); return new KStreamSessionWindowAggregateProcessor();

View File

@ -28,7 +28,10 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.WindowStoreIterator;
@ -36,6 +39,7 @@ import org.apache.kafka.streams.state.WindowStoreIterator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
@ -46,6 +50,7 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
private static final Logger log = LoggerFactory.getLogger(KStreamSlidingWindowAggregate.class); private static final Logger log = LoggerFactory.getLogger(KStreamSlidingWindowAggregate.class);
private final String storeName; private final String storeName;
private final StoreFactory storeFactory;
private final SlidingWindows windows; private final SlidingWindows windows;
private final Initializer<VAgg> initializer; private final Initializer<VAgg> initializer;
private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator; private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
@ -54,17 +59,23 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
private boolean sendOldValues = false; private boolean sendOldValues = false;
public KStreamSlidingWindowAggregate(final SlidingWindows windows, public KStreamSlidingWindowAggregate(final SlidingWindows windows,
final String storeName, final StoreFactory storeFactory,
final EmitStrategy emitStrategy, final EmitStrategy emitStrategy,
final Initializer<VAgg> initializer, final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) { final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) {
this.windows = windows; this.windows = windows;
this.storeName = storeName; this.storeName = storeFactory.name();
this.storeFactory = storeFactory;
this.initializer = initializer; this.initializer = initializer;
this.aggregator = aggregator; this.aggregator = aggregator;
this.emitStrategy = emitStrategy; this.emitStrategy = emitStrategy;
} }
@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory));
}
@Override @Override
public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() { public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() {
return new KStreamSlidingWindowAggregateProcessor(storeName, emitStrategy, sendOldValues); return new KStreamSlidingWindowAggregateProcessor(storeName, emitStrategy, sendOldValues);

View File

@ -29,13 +29,18 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Set;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@ -44,6 +49,7 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends Window> implements
private static final Logger log = LoggerFactory.getLogger(KStreamWindowAggregate.class); private static final Logger log = LoggerFactory.getLogger(KStreamWindowAggregate.class);
private final String storeName; private final String storeName;
private final StoreFactory storeFactory;
private final Windows<W> windows; private final Windows<W> windows;
private final Initializer<VAgg> initializer; private final Initializer<VAgg> initializer;
private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator; private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
@ -52,12 +58,13 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends Window> implements
private boolean sendOldValues = false; private boolean sendOldValues = false;
public KStreamWindowAggregate(final Windows<W> windows, public KStreamWindowAggregate(final Windows<W> windows,
final String storeName, final StoreFactory storeFactory,
final EmitStrategy emitStrategy, final EmitStrategy emitStrategy,
final Initializer<VAgg> initializer, final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) { final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) {
this.windows = windows; this.windows = windows;
this.storeName = storeName; this.storeName = storeFactory.name();
this.storeFactory = storeFactory;
this.emitStrategy = emitStrategy; this.emitStrategy = emitStrategy;
this.initializer = initializer; this.initializer = initializer;
this.aggregator = aggregator; this.aggregator = aggregator;
@ -70,6 +77,11 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends Window> implements
} }
} }
@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory));
}
@Override @Override
public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() { public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() {
return new KStreamWindowAggregateProcessor(storeName, emitStrategy, sendOldValues); return new KStreamWindowAggregateProcessor(storeName, emitStrategy, sendOldValues);

View File

@ -16,15 +16,23 @@
*/ */
package org.apache.kafka.streams.kstream.internals; package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper; import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
import java.util.Collections;
import java.util.Set;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT; import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST; import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST;
@ -33,17 +41,19 @@ public class KTableAggregate<KIn, VIn, VAgg> implements
KTableProcessorSupplier<KIn, VIn, KIn, VAgg> { KTableProcessorSupplier<KIn, VIn, KIn, VAgg> {
private final String storeName; private final String storeName;
private final StoreFactory storeFactory;
private final Initializer<VAgg> initializer; private final Initializer<VAgg> initializer;
private final Aggregator<? super KIn, ? super VIn, VAgg> add; private final Aggregator<? super KIn, ? super VIn, VAgg> add;
private final Aggregator<? super KIn, ? super VIn, VAgg> remove; private final Aggregator<? super KIn, ? super VIn, VAgg> remove;
private boolean sendOldValues = false; private boolean sendOldValues = false;
KTableAggregate(final String storeName, KTableAggregate(final MaterializedInternal<KIn, VAgg, KeyValueStore<Bytes, byte[]>> materialized,
final Initializer<VAgg> initializer, final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super VIn, VAgg> add, final Aggregator<? super KIn, ? super VIn, VAgg> add,
final Aggregator<? super KIn, ? super VIn, VAgg> remove) { final Aggregator<? super KIn, ? super VIn, VAgg> remove) {
this.storeName = storeName; this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
this.storeName = materialized.storeName();
this.initializer = initializer; this.initializer = initializer;
this.add = add; this.add = add;
this.remove = remove; this.remove = remove;
@ -56,6 +66,11 @@ public class KTableAggregate<KIn, VIn, VAgg> implements
return true; return true;
} }
@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory));
}
@Override @Override
public Processor<KIn, Change<VIn>, KIn, Change<VAgg>> get() { public Processor<KIn, Change<VIn>, KIn, Change<VAgg>> get() {
return new KTableAggregateProcessor(); return new KTableAggregateProcessor();

View File

@ -16,14 +16,22 @@
*/ */
package org.apache.kafka.streams.kstream.internals; package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper; import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
import java.util.Collections;
import java.util.Set;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT; import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST; import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST;
@ -31,13 +39,17 @@ import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_
public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, K, V> { public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, K, V> {
private final String storeName; private final String storeName;
private final StoreFactory storeFactory;
private final Reducer<V> addReducer; private final Reducer<V> addReducer;
private final Reducer<V> removeReducer; private final Reducer<V> removeReducer;
private boolean sendOldValues = false; private boolean sendOldValues = false;
KTableReduce(final String storeName, final Reducer<V> addReducer, final Reducer<V> removeReducer) { KTableReduce(final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized,
this.storeName = storeName; final Reducer<V> addReducer,
final Reducer<V> removeReducer) {
this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
this.storeName = materialized.storeName();
this.addReducer = addReducer; this.addReducer = addReducer;
this.removeReducer = removeReducer; this.removeReducer = removeReducer;
} }
@ -49,6 +61,11 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, K, V> {
return true; return true;
} }
@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory));
}
@Override @Override
public Processor<K, Change<V>, K, Change<V>> get() { public Processor<K, Change<V>, K, Change<V>> get() {
return new KTableReduceProcessor(); return new KTableReduceProcessor();

View File

@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes; import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode; import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.SessionStore;
import java.util.Objects; import java.util.Objects;
@ -108,12 +109,14 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
} }
final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
final StoreFactory storeFactory = new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy);
return aggregateBuilder.build( return aggregateBuilder.build(
new NamedInternal(aggregateName), new NamedInternal(aggregateName),
new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy), storeFactory,
new KStreamSessionWindowAggregate<>( new KStreamSessionWindowAggregate<>(
windows, windows,
materializedInternal.storeName(), storeFactory,
emitStrategy, emitStrategy,
aggregateBuilder.countInitializer, aggregateBuilder.countInitializer,
aggregateBuilder.countAggregator, aggregateBuilder.countAggregator,
@ -158,12 +161,14 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
} }
final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
final StoreFactory storeFactory = new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy);
return aggregateBuilder.build( return aggregateBuilder.build(
new NamedInternal(reduceName), new NamedInternal(reduceName),
new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy), storeFactory,
new KStreamSessionWindowAggregate<>( new KStreamSessionWindowAggregate<>(
windows, windows,
materializedInternal.storeName(), storeFactory,
emitStrategy, emitStrategy,
aggregateBuilder.reduceInitializer, aggregateBuilder.reduceInitializer,
reduceAggregator, reduceAggregator,
@ -216,13 +221,14 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
} }
final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
final StoreFactory storeFactory = new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy);
return aggregateBuilder.build( return aggregateBuilder.build(
new NamedInternal(aggregateName), new NamedInternal(aggregateName),
new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy), storeFactory,
new KStreamSessionWindowAggregate<>( new KStreamSessionWindowAggregate<>(
windows, windows,
materializedInternal.storeName(), storeFactory,
emitStrategy, emitStrategy,
initializer, initializer,
aggregator, aggregator,

View File

@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindowedKStream; import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode; import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStore;
import java.util.Objects; import java.util.Objects;
@ -90,11 +91,12 @@ public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
} }
final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
final StoreFactory storeFactory = new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
return aggregateBuilder.build( return aggregateBuilder.build(
new NamedInternal(aggregateName), new NamedInternal(aggregateName),
new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy), storeFactory,
new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), new KStreamSlidingWindowAggregate<>(windows, storeFactory, emitStrategy, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
materializedInternal.queryableStoreName(), materializedInternal.queryableStoreName(),
materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null, materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null,
materializedInternal.valueSerde(), materializedInternal.valueSerde(),
@ -135,11 +137,12 @@ public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
materializedInternal.withKeySerde(keySerde); materializedInternal.withKeySerde(keySerde);
} }
final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
final StoreFactory storeFactory = new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
return aggregateBuilder.build( return aggregateBuilder.build(
new NamedInternal(aggregateName), new NamedInternal(aggregateName),
new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy), storeFactory,
new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, initializer, aggregator), new KStreamSlidingWindowAggregate<>(windows, storeFactory, emitStrategy, initializer, aggregator),
materializedInternal.queryableStoreName(), materializedInternal.queryableStoreName(),
materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null, materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null,
materializedInternal.valueSerde(), materializedInternal.valueSerde(),
@ -181,11 +184,12 @@ public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
} }
final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
final StoreFactory storeFactory = new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
return aggregateBuilder.build( return aggregateBuilder.build(
new NamedInternal(reduceName), new NamedInternal(reduceName),
new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy), storeFactory,
new KStreamSlidingWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)), new KStreamSlidingWindowAggregate<>(windows, storeFactory, emitStrategy, aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
materializedInternal.queryableStoreName(), materializedInternal.queryableStoreName(),
materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null, materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null,
materializedInternal.valueSerde(), materializedInternal.valueSerde(),

View File

@ -33,6 +33,7 @@ import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode; import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStore;
import java.util.Objects; import java.util.Objects;
@ -102,13 +103,14 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
} }
final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
final StoreFactory storeFactory = new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
return aggregateBuilder.build( return aggregateBuilder.build(
new NamedInternal(aggregateName), new NamedInternal(aggregateName),
new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy), storeFactory,
new KStreamWindowAggregate<>( new KStreamWindowAggregate<>(
windows, windows,
materializedInternal.storeName(), storeFactory,
emitStrategy, emitStrategy,
aggregateBuilder.countInitializer, aggregateBuilder.countInitializer,
aggregateBuilder.countAggregator), aggregateBuilder.countAggregator),
@ -154,13 +156,14 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
} }
final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
final StoreFactory storeFactory = new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
return aggregateBuilder.build( return aggregateBuilder.build(
new NamedInternal(aggregateName), new NamedInternal(aggregateName),
new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy), storeFactory,
new KStreamWindowAggregate<>( new KStreamWindowAggregate<>(
windows, windows,
materializedInternal.storeName(), storeFactory,
emitStrategy, emitStrategy,
initializer, initializer,
aggregator), aggregator),
@ -205,13 +208,14 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
} }
final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
final StoreFactory storeFactory = new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
return aggregateBuilder.build( return aggregateBuilder.build(
new NamedInternal(reduceName), new NamedInternal(reduceName),
new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy), storeFactory,
new KStreamWindowAggregate<>( new KStreamWindowAggregate<>(
windows, windows,
materializedInternal.storeName(), storeFactory,
emitStrategy, emitStrategy,
aggregateBuilder.reduceInitializer, aggregateBuilder.reduceInitializer,
aggregatorForReducer(reducer)), aggregatorForReducer(reducer)),

View File

@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Materialized;
@ -35,7 +36,9 @@ import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Printed; import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.StreamJoined; import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorContext;
@ -47,6 +50,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers; import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
@ -56,6 +60,7 @@ import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStore; import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper; import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper;
import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper.WrapperRecorder;
import org.apache.kafka.test.MockApiProcessorSupplier; import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate; import org.apache.kafka.test.MockPredicate;
@ -76,7 +81,6 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
@ -88,6 +92,8 @@ import static java.util.Arrays.asList;
import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1;
import static org.apache.kafka.streams.state.Stores.inMemoryKeyValueStore;
import static org.apache.kafka.streams.state.Stores.timestampedKeyValueStoreBuilder;
import static org.apache.kafka.streams.utils.TestUtils.PROCESSOR_WRAPPER_COUNTER_CONFIG; import static org.apache.kafka.streams.utils.TestUtils.PROCESSOR_WRAPPER_COUNTER_CONFIG;
import static org.apache.kafka.streams.utils.TestUtils.dummyStreamsConfigMap; import static org.apache.kafka.streams.utils.TestUtils.dummyStreamsConfigMap;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
@ -124,7 +130,7 @@ public class StreamsBuilderTest {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
builder.addGlobalStore( builder.addGlobalStore(
Stores.keyValueStoreBuilder( Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("store"), inMemoryKeyValueStore("store"),
Serdes.String(), Serdes.String(),
Serdes.String() Serdes.String()
), ),
@ -1384,7 +1390,7 @@ public class StreamsBuilderTest {
@Test @Test
public void shouldUseSpecifiedNameForGlobalStoreProcessor() { public void shouldUseSpecifiedNameForGlobalStoreProcessor() {
builder.addGlobalStore(Stores.keyValueStoreBuilder( builder.addGlobalStore(Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("store"), inMemoryKeyValueStore("store"),
Serdes.String(), Serdes.String(),
Serdes.String() Serdes.String()
), ),
@ -1401,7 +1407,7 @@ public class StreamsBuilderTest {
@Test @Test
public void shouldUseDefaultNameForGlobalStoreProcessor() { public void shouldUseDefaultNameForGlobalStoreProcessor() {
builder.addGlobalStore(Stores.keyValueStoreBuilder( builder.addGlobalStore(Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("store"), inMemoryKeyValueStore("store"),
Serdes.String(), Serdes.String(),
Serdes.String() Serdes.String()
), ),
@ -1420,8 +1426,8 @@ public class StreamsBuilderTest {
final Map<Object, Object> props = dummyStreamsConfigMap(); final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);
final Set<String> wrappedProcessors = Collections.synchronizedSet(new HashSet<>()); final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors); props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
@ -1430,35 +1436,247 @@ public class StreamsBuilderTest {
// call to fail // call to fail
final Random random = new Random(); final Random random = new Random();
builder.stream("input") final StoreBuilder<?> store = timestampedKeyValueStoreBuilder(inMemoryKeyValueStore("store"), Serdes.String(), Serdes.String());
.process((ProcessorSupplier<Object, Object, Object, Object>) () -> record -> System.out.println("Processing: " + random.nextInt()), Named.as("processor1")) builder.stream("input", Consumed.as("source"))
.processValues(() -> record -> System.out.println("Processing: " + random.nextInt()), Named.as("processor2")) .process(
.to("output"); new ProcessorSupplier<>() {
@Override
public Processor<Object, Object, Object, Object> get() {
return record -> System.out.println("Processing: " + random.nextInt());
}
@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(store);
}
},
Named.as("stateful-process-1"))
.process(
new ProcessorSupplier<>() {
@Override
public Processor<Object, Object, Object, Object> get() {
return record -> System.out.println("Processing: " + random.nextInt());
}
@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(store);
}
},
Named.as("stateful-process-2"))
.processValues(
() -> record -> System.out.println("Processing values: " + random.nextInt()),
Named.as("stateless-processValues"))
.to("output", Produced.as("sink"));
builder.build(); builder.build();
assertThat(wrappedProcessors.size(), CoreMatchers.is(2)); assertThat(counter.numWrappedProcessors(), CoreMatchers.is(3));
assertThat(wrappedProcessors, Matchers.containsInAnyOrder("processor1", "processor2")); assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
"stateful-process-1", "stateful-process-2", "stateless-processValues"));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
} }
@Test @Test
public void shouldWrapProcessorsForAggregationOperators() { public void shouldWrapProcessorsForStreamReduce() {
final Map<Object, Object> props = dummyStreamsConfigMap(); final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);
final Set<String> wrappedProcessors = Collections.synchronizedSet(new HashSet<>()); final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors); props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
builder.stream("input") builder.stream("input", Consumed.as("source"))
.groupBy(KeyValue::new, Grouped.as("groupBy")) // wrapped 1 & 2 (implicit selectKey & repartition)
.reduce((l, r) -> l, Named.as("reduce"), Materialized.as("store")) // wrapped 3
.toStream(Named.as("toStream"))// wrapped 4
.to("output", Produced.as("sink"));
builder.build();
assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
"groupBy", "groupBy-repartition-filter", "reduce", "toStream"));
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(4));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
}
@Test
public void shouldWrapProcessorsForStreamAggregate() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);
final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
builder.stream("input", Consumed.as("source"))
.groupByKey() .groupByKey()
.count(Named.as("count")) // wrapped 1 .count(Named.as("count")) // wrapped 1
.toStream(Named.as("toStream"))// wrapped 2 .toStream(Named.as("toStream"))// wrapped 2
.to("output"); .to("output", Produced.as("sink"));
builder.build(); builder.build();
assertThat(wrappedProcessors.size(), CoreMatchers.is(2)); assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
assertThat(wrappedProcessors, Matchers.containsInAnyOrder("count", "toStream")); assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder("count", "toStream"));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
}
@Test
public void shouldWrapProcessorsForTimeWindowStreamAggregate() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);
final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
builder.stream("input", Consumed.as("source"))
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(1)))
.count(Named.as("count")) // wrapped 1
.toStream(Named.as("toStream"))// wrapped 2
.to("output", Produced.as("sink"));
builder.build();
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder("count", "toStream"));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
}
@Test
public void shouldWrapProcessorsForSlidingWindowStreamAggregate() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);
final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
builder.stream("input", Consumed.as("source"))
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofDays(1)))
.count(Named.as("count")) // wrapped 1
.toStream(Named.as("toStream"))// wrapped 2
.to("output", Produced.as("sink"));
builder.build();
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder("count", "toStream"));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
}
@Test
public void shouldWrapProcessorsForSessionWindowStreamAggregate() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);
final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
builder.stream("input", Consumed.as("source"))
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofDays(1)))
.count(Named.as("count")) // wrapped 1
.toStream(Named.as("toStream"))// wrapped 2
.to("output", Produced.as("sink"));
builder.build();
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder("count", "toStream"));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
}
@Test
public void shouldWrapProcessorsForCoGroupedStreamAggregate() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);
final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
final KStream<String, String> stream1 = builder.stream("one", Consumed.as("source-1"));
final KStream<String, String> stream2 = builder.stream("two", Consumed.as("source-2"));
final KGroupedStream<String, String> grouped1 = stream1.groupByKey(Grouped.as("groupByKey-1"));
final KGroupedStream<String, String> grouped2 = stream2.groupByKey(Grouped.as("groupByKey-2"));
grouped1
.cogroup((k, v, a) -> a + v) // wrapped 1
.cogroup(grouped2, (k, v, a) -> a + v) // wrapped 2
.aggregate(() -> "", Named.as("aggregate"), Materialized.as("store")) // wrapped 3, store 1
.toStream(Named.as("toStream"))// wrapped 4
.to("output", Produced.as("sink"));
final var top = builder.build();
System.out.println(top.describe());
assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
"aggregate-cogroup-agg-0", "aggregate-cogroup-agg-1", "aggregate-cogroup-merge", "toStream"
));
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(4));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
}
@Test
public void shouldWrapProcessorsForTableAggregate() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);
final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
builder.table("input", Consumed.as("source-table")) // wrapped 1, store 1
.groupBy(KeyValue::new, Grouped.as("groupBy")) // wrapped 2 (implicit selectKey)
.count(Named.as("count")) // wrapped 3, store 2
.toStream(Named.as("toStream"))// wrapped 4
.to("output", Produced.as("sink"));
builder.build();
assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
"source-table", "groupBy", "count", "toStream"
));
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(4));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
}
@Test
public void shouldWrapProcessorsForTableReduce() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);
final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
builder.table("input", Consumed.as("source-table")) // wrapped 1, store 1
.groupBy(KeyValue::new, Grouped.as("groupBy")) // wrapped 2 (implicit selectKey)
.reduce((l, r) -> "", (l, r) -> "", Named.as("reduce"), Materialized.as("store")) // wrapped 3, store 2
.toStream(Named.as("toStream"))// wrapped 4
.to("output", Produced.as("sink"));
builder.build();
assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
"source-table", "groupBy", "reduce", "toStream"
));
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(4));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
} }
@Test @Test
@ -1466,49 +1684,76 @@ public class StreamsBuilderTest {
final Map<Object, Object> props = dummyStreamsConfigMap(); final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);
final Set<String> wrappedProcessors = Collections.synchronizedSet(new HashSet<>()); final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors); props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
builder.stream("input") builder.stream("input", Consumed.as("source"))
.filter((k, v) -> true, Named.as("filter")) // wrapped 1 .filter((k, v) -> true, Named.as("filter-stream")) // wrapped 1
.map(KeyValue::new, Named.as("map")) // wrapped 2 .map(KeyValue::new, Named.as("map")) // wrapped 2
.selectKey((k, v) -> k, Named.as("selectKey")) // wrapped 3 .selectKey((k, v) -> k, Named.as("selectKey")) // wrapped 3
.peek((k, v) -> { }, Named.as("peek")) // wrapped 4 .peek((k, v) -> { }, Named.as("peek")) // wrapped 4
.flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) // wrapped 5 .flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) // wrapped 5
.toTable(Named.as("toTable")) // wrapped 6 (note named as toTable-repartition-filter) .toTable(Named.as("toTable")) // should be wrapped when we do StreamToTableNode
.filter((k, v) -> true, Named.as("filter-table")) // should be wrapped once we do TableProcessorNode
.toStream(Named.as("toStream")) // wrapped 7 .toStream(Named.as("toStream")) // wrapped 7
.to("output"); .to("output", Produced.as("sink"));
builder.build(); builder.build();
assertThat(wrappedProcessors.size(), CoreMatchers.is(7)); assertThat(counter.numWrappedProcessors(), CoreMatchers.is(7));
assertThat(wrappedProcessors, Matchers.containsInAnyOrder( assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
"filter", "map", "selectKey", "peek", "flatMap", "toTable-repartition-filter", "filter-stream", "map", "selectKey", "peek", "flatMap",
"toStream" "toTable-repartition-filter", "toStream"
)); ));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(0));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(0));
} }
@Test @Test
public void shouldWrapProcessorsForTableSource() { public void shouldWrapProcessorsForUnmaterializedSourceTable() {
final Map<Object, Object> props = dummyStreamsConfigMap(); final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);
final Set<String> wrappedProcessors = Collections.synchronizedSet(new HashSet<>()); final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors); props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
builder.table("input") // wrapped 1 (named KTABLE_SOURCE-0000000002) builder.table("input", Consumed.as("source")) // wrapped 1
.toStream(Named.as("toStream")) // wrapped 2 .toStream(Named.as("toStream")) // wrapped 2
.to("output"); .to("output", Produced.as("sink"));
builder.build(); builder.build();
assertThat(wrappedProcessors.size(), CoreMatchers.is(2)); assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
assertThat(wrappedProcessors, Matchers.containsInAnyOrder( assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
"KTABLE-SOURCE-0000000002", "source", "toStream"
"toStream"
)); ));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(0));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(0));
}
@Test
public void shouldWrapProcessorsForMaterializedSourceTable() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);
final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
builder.table("input", Consumed.as("source"), Materialized.as("store")) // wrapped 1
.toStream(Named.as("toStream")) // wrapped 2
.to("output", Produced.as("sink"));
builder.build();
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
"source", "toStream"
));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
} }
@Test @Test

View File

@ -46,6 +46,7 @@ import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper; import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper;
import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper.WrapperRecorder;
import org.apache.kafka.test.MockApiProcessorSupplier; import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStore; import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockProcessorSupplier;
@ -2427,8 +2428,8 @@ public class TopologyTest {
final Map<Object, Object> props = dummyStreamsConfigMap(); final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);
final Set<String> wrappedProcessors = Collections.synchronizedSet(new HashSet<>()); final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors); props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final Topology topology = new Topology(new TopologyConfig(new StreamsConfig(props))); final Topology topology = new Topology(new TopologyConfig(new StreamsConfig(props)));
@ -2453,8 +2454,8 @@ public class TopologyTest {
() -> (Processor<Object, Object, Object, Object>) record -> System.out.println("Processing: " + random.nextInt()), () -> (Processor<Object, Object, Object, Object>) record -> System.out.println("Processing: " + random.nextInt()),
"p2" "p2"
); );
assertThat(wrappedProcessors.size(), is(3)); assertThat(counter.numWrappedProcessors(), is(3));
assertThat(wrappedProcessors, Matchers.containsInAnyOrder("p1", "p2", "p3")); assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder("p1", "p2", "p3"));
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")

View File

@ -64,6 +64,7 @@ import java.util.stream.Collectors;
import static java.time.Duration.ofMillis; import static java.time.Duration.ofMillis;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.utils.TestUtils.mockStoreFactory;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
@ -126,7 +127,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
sessionAggregator = new KStreamSessionWindowAggregate<>( sessionAggregator = new KStreamSessionWindowAggregate<>(
SessionWindows.ofInactivityGapWithNoGrace(ofMillis(GAP_MS)), SessionWindows.ofInactivityGapWithNoGrace(ofMillis(GAP_MS)),
STORE_NAME, mockStoreFactory(STORE_NAME),
emitStrategy, emitStrategy,
initializer, initializer,
aggregator, aggregator,
@ -484,7 +485,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
setup(inputType, false); setup(inputType, false);
final Processor<String, String, Windowed<String>, Change<Long>> processor = new KStreamSessionWindowAggregate<>( final Processor<String, String, Windowed<String>, Change<Long>> processor = new KStreamSessionWindowAggregate<>(
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(0L)), SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(0L)),
STORE_NAME, mockStoreFactory(STORE_NAME),
EmitStrategy.onWindowUpdate(), EmitStrategy.onWindowUpdate(),
initializer, initializer,
aggregator, aggregator,
@ -551,7 +552,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
setup(inputType, false); setup(inputType, false);
final Processor<String, String, Windowed<String>, Change<Long>> processor = new KStreamSessionWindowAggregate<>( final Processor<String, String, Windowed<String>, Change<Long>> processor = new KStreamSessionWindowAggregate<>(
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1L)), SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1L)),
STORE_NAME, mockStoreFactory(STORE_NAME),
EmitStrategy.onWindowUpdate(), EmitStrategy.onWindowUpdate(),
initializer, initializer,
aggregator, aggregator,

View File

@ -46,6 +46,7 @@ import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForwa
import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
@ -76,6 +77,7 @@ import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.utils.TestUtils.mockStoreFactory;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.hasItems;
@ -90,6 +92,7 @@ public class KStreamWindowAggregateTest {
private static final String WINDOW_STORE_NAME = "dummy-store-name"; private static final String WINDOW_STORE_NAME = "dummy-store-name";
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private final String threadId = Thread.currentThread().getName(); private final String threadId = Thread.currentThread().getName();
private final StoreFactory storeFactory = mockStoreFactory(WINDOW_STORE_NAME);
public StrategyType type; public StrategyType type;
@ -646,7 +649,7 @@ public class KStreamWindowAggregateTest {
final MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = makeContext(stateDir, windowSize); final MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = makeContext(stateDir, windowSize);
final KStreamWindowAggregate<String, String, String, TimeWindow> processorSupplier = new KStreamWindowAggregate<>( final KStreamWindowAggregate<String, String, String, TimeWindow> processorSupplier = new KStreamWindowAggregate<>(
windows, windows,
WINDOW_STORE_NAME, storeFactory,
emitStrategy, emitStrategy,
MockInitializer.STRING_INIT, MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER MockAggregator.TOSTRING_ADDER
@ -736,7 +739,7 @@ public class KStreamWindowAggregateTest {
final MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = makeContext(stateDir, windowSize); final MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = makeContext(stateDir, windowSize);
final KStreamWindowAggregate<String, String, String, TimeWindow> processorSupplier = new KStreamWindowAggregate<>( final KStreamWindowAggregate<String, String, String, TimeWindow> processorSupplier = new KStreamWindowAggregate<>(
windows, windows,
WINDOW_STORE_NAME, storeFactory,
emitStrategy, emitStrategy,
MockInitializer.STRING_INIT, MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER MockAggregator.TOSTRING_ADDER
@ -805,7 +808,7 @@ public class KStreamWindowAggregateTest {
final MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = makeContext(stateDir, windowSize); final MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = makeContext(stateDir, windowSize);
final KStreamWindowAggregate<String, String, String, TimeWindow> processorSupplier = new KStreamWindowAggregate<>( final KStreamWindowAggregate<String, String, String, TimeWindow> processorSupplier = new KStreamWindowAggregate<>(
windows, windows,
WINDOW_STORE_NAME, storeFactory,
emitStrategy, emitStrategy,
MockInitializer.STRING_INIT, MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER MockAggregator.TOSTRING_ADDER
@ -906,7 +909,7 @@ public class KStreamWindowAggregateTest {
final MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = makeContext(stateDir, windowSize); final MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = makeContext(stateDir, windowSize);
final KStreamWindowAggregate<String, String, String, TimeWindow> processorSupplier = new KStreamWindowAggregate<>( final KStreamWindowAggregate<String, String, String, TimeWindow> processorSupplier = new KStreamWindowAggregate<>(
windows, windows,
WINDOW_STORE_NAME, storeFactory,
emitStrategy, emitStrategy,
MockInitializer.STRING_INIT, MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER MockAggregator.TOSTRING_ADDER
@ -982,7 +985,7 @@ public class KStreamWindowAggregateTest {
final IllegalArgumentException e = assertThrows( final IllegalArgumentException e = assertThrows(
IllegalArgumentException.class, () -> new KStreamWindowAggregate<>( IllegalArgumentException.class, () -> new KStreamWindowAggregate<>(
UnlimitedWindows.of(), UnlimitedWindows.of(),
WINDOW_STORE_NAME, storeFactory,
emitStrategy, emitStrategy,
MockInitializer.STRING_INIT, MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER) MockAggregator.TOSTRING_ADDER)
@ -992,7 +995,7 @@ public class KStreamWindowAggregateTest {
} else { } else {
new KStreamWindowAggregate<>( new KStreamWindowAggregate<>(
UnlimitedWindows.of(), UnlimitedWindows.of(),
WINDOW_STORE_NAME, storeFactory,
emitStrategy, emitStrategy,
MockInitializer.STRING_INIT, MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER MockAggregator.TOSTRING_ADDER

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.kafka.streams.kstream.internals; package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorNode;
@ -42,7 +43,7 @@ public class KTableReduceTest {
final Processor<String, Change<Set<String>>, String, Change<Set<String>>> reduceProcessor = final Processor<String, Change<Set<String>>, String, Change<Set<String>>> reduceProcessor =
new KTableReduce<String, Set<String>>( new KTableReduce<String, Set<String>>(
"myStore", new MaterializedInternal<>(Materialized.as("myStore")),
this::unionNotNullArgs, this::unionNotNullArgs,
this::differenceNotNullArgs this::differenceNotNullArgs
).get(); ).get();

View File

@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static java.time.Duration.ofMillis; import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.utils.TestUtils.mockStoreFactory;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
@ -82,7 +83,7 @@ public class GraphGraceSearchUtilTest {
new ProcessorParameters<>( new ProcessorParameters<>(
new KStreamWindowAggregate<String, Long, Integer, TimeWindow>( new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
windows, windows,
"asdf", mockStoreFactory("asdf"),
EmitStrategy.onWindowUpdate(), EmitStrategy.onWindowUpdate(),
null, null,
null null
@ -105,7 +106,7 @@ public class GraphGraceSearchUtilTest {
new ProcessorParameters<>( new ProcessorParameters<>(
new KStreamSessionWindowAggregate<String, Long, Integer>( new KStreamSessionWindowAggregate<String, Long, Integer>(
windows, windows,
"asdf", mockStoreFactory("asdf"),
EmitStrategy.onWindowUpdate(), EmitStrategy.onWindowUpdate(),
null, null,
null, null,
@ -126,7 +127,7 @@ public class GraphGraceSearchUtilTest {
final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>( final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
"asdf", "asdf",
new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>( new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>(
windows, "asdf", EmitStrategy.onWindowUpdate(), null, null, null windows, mockStoreFactory("asdf"), EmitStrategy.onWindowUpdate(), null, null, null
), "asdf"), ), "asdf"),
(StoreFactory) null (StoreFactory) null
); );
@ -161,7 +162,7 @@ public class GraphGraceSearchUtilTest {
new ProcessorParameters<>( new ProcessorParameters<>(
new KStreamSessionWindowAggregate<String, Long, Integer>( new KStreamSessionWindowAggregate<String, Long, Integer>(
windows, windows,
"asdf", mockStoreFactory("asdf"),
EmitStrategy.onWindowUpdate(), EmitStrategy.onWindowUpdate(),
null, null,
null, null,
@ -189,7 +190,7 @@ public class GraphGraceSearchUtilTest {
new ProcessorParameters<>( new ProcessorParameters<>(
new KStreamSessionWindowAggregate<String, Long, Integer>( new KStreamSessionWindowAggregate<String, Long, Integer>(
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)), SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)),
"asdf", mockStoreFactory("asdf"),
EmitStrategy.onWindowUpdate(), EmitStrategy.onWindowUpdate(),
null, null,
null, null,
@ -205,7 +206,7 @@ public class GraphGraceSearchUtilTest {
new ProcessorParameters<>( new ProcessorParameters<>(
new KStreamWindowAggregate<String, Long, Integer, TimeWindow>( new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(4321L)), TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(4321L)),
"asdf", mockStoreFactory("asdf"),
EmitStrategy.onWindowUpdate(), EmitStrategy.onWindowUpdate(),
null, null,
null null

View File

@ -18,13 +18,19 @@ package org.apache.kafka.streams.utils;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorWrapper; import org.apache.kafka.streams.processor.api.ProcessorWrapper;
import org.apache.kafka.streams.processor.api.WrappedFixedKeyProcessorSupplier; import org.apache.kafka.streams.processor.api.WrappedFixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.WrappedProcessorSupplier; import org.apache.kafka.streams.processor.api.WrappedProcessorSupplier;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper.WrapperRecorder;
import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.TestInfo;
import org.mockito.Mockito;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.time.Duration; import java.time.Duration;
@ -43,7 +49,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
public class TestUtils { public class TestUtils {
public static final String PROCESSOR_WRAPPER_COUNTER_CONFIG = "wrapped.processor.count"; public static final String PROCESSOR_WRAPPER_COUNTER_CONFIG = "wrapped.counter";
/** /**
* Waits for the given {@link KafkaStreams} instances to all be in a specific {@link KafkaStreams.State}. * Waits for the given {@link KafkaStreams} instances to all be in a specific {@link KafkaStreams.State}.
@ -111,6 +117,12 @@ public class TestUtils {
return baseConfigs; return baseConfigs;
} }
public static StoreFactory mockStoreFactory(final String name) {
final StoreFactory storeFactory = Mockito.mock(StoreFactory.class);
Mockito.when(storeFactory.name()).thenReturn(name);
return storeFactory;
}
/** /**
* Simple pass-through processor wrapper that counts the number of processors * Simple pass-through processor wrapper that counts the number of processors
* it wraps. * it wraps.
@ -119,29 +131,142 @@ public class TestUtils {
*/ */
public static class RecordingProcessorWrapper implements ProcessorWrapper { public static class RecordingProcessorWrapper implements ProcessorWrapper {
private Set<String> wrappedProcessorNames; private WrapperRecorder recorder;
@Override @Override
public void configure(final Map<String, ?> configs) { public void configure(final Map<String, ?> configs) {
if (configs.containsKey(PROCESSOR_WRAPPER_COUNTER_CONFIG)) { if (configs.containsKey(PROCESSOR_WRAPPER_COUNTER_CONFIG)) {
wrappedProcessorNames = (Set<String>) configs.get(PROCESSOR_WRAPPER_COUNTER_CONFIG); recorder = (WrapperRecorder) configs.get(PROCESSOR_WRAPPER_COUNTER_CONFIG);
} else { } else {
wrappedProcessorNames = Collections.synchronizedSet(new HashSet<>()); recorder = new WrapperRecorder();
} }
} }
public static class WrapperRecorder {
private final Set<String> uniqueStores = new HashSet<>();
private final Set<String> processorStoresCounted = new HashSet<>();
private final Set<String> wrappedProcessorNames = Collections.synchronizedSet(new HashSet<>());
public void wrapProcessorSupplier(final String name) {
wrappedProcessorNames.add(name);
}
public void wrapStateStore(final String processorName, final String storeName) {
if (!uniqueStores.contains(storeName)) {
uniqueStores.add(storeName);
}
final String processorStoreKey = processorName + storeName;
if (!processorStoresCounted.contains(processorStoreKey)) {
processorStoresCounted.add(processorStoreKey);
}
}
public int numWrappedProcessors() {
return wrappedProcessorNames.size();
}
// Number of unique state stores in the topology connected to their processors via the
// ProcessorSupplier#stores method. State stores connected to more than one processor are
// counted only once
public int numUniqueStateStores() {
return uniqueStores.size();
}
// Number of stores connected to a processor via the ProcessorSupplier#stores method (ie the size
// of the set returned by #stores), summed across all processors in the topology.
// Equal to the number of unique <processorName>-<storeName>
// pairings. Will be greater than or equal to the value of #numUniqueStateStores, as this method
// will double count any stores connected to more than one processor
public int numConnectedStateStores() {
return processorStoresCounted.size();
}
public Set<String> wrappedProcessorNames() {
return wrappedProcessorNames;
}
}
@Override @Override
public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wrapProcessorSupplier(final String processorName, public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wrapProcessorSupplier(final String processorName,
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier) { final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier) {
wrappedProcessorNames.add(processorName);
return ProcessorWrapper.asWrapped(processorSupplier); return new CountingDelegatingProcessorSupplier<>(recorder, processorName, processorSupplier);
} }
@Override @Override
public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier(final String processorName, public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier(final String processorName,
final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier) { final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier) {
wrappedProcessorNames.add(processorName); return new CountingDelegatingFixedKeyProcessorSupplier<>(recorder, processorName, processorSupplier);
return ProcessorWrapper.asWrappedFixedKey(processorSupplier); }
}
private static class CountingDelegatingProcessorSupplier<KIn, VIn, KOut, VOut>
implements WrappedProcessorSupplier<KIn, VIn, KOut, VOut> {
private final WrapperRecorder counter;
private final String processorName;
private final ProcessorSupplier<KIn, VIn, KOut, VOut> delegate;
public CountingDelegatingProcessorSupplier(final WrapperRecorder counter,
final String processorName,
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier) {
this.counter = counter;
this.processorName = processorName;
this.delegate = processorSupplier;
counter.wrapProcessorSupplier(processorName);
}
@Override
public Set<StoreBuilder<?>> stores() {
final Set<StoreBuilder<?>> stores = delegate.stores();
if (stores != null) {
for (final StoreBuilder<?> store : stores) {
counter.wrapStateStore(processorName, store.name());
}
}
return stores;
}
@Override
public Processor<KIn, VIn, KOut, VOut> get() {
return delegate.get();
}
}
private static class CountingDelegatingFixedKeyProcessorSupplier<KIn, VIn, VOut>
implements WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> {
private final WrapperRecorder counter;
private final String processorName;
private final FixedKeyProcessorSupplier<KIn, VIn, VOut> delegate;
public CountingDelegatingFixedKeyProcessorSupplier(final WrapperRecorder counter,
final String processorName,
final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier) {
this.counter = counter;
this.processorName = processorName;
this.delegate = processorSupplier;
counter.wrapProcessorSupplier(processorName);
}
@Override
public Set<StoreBuilder<?>> stores() {
final Set<StoreBuilder<?>> stores = delegate.stores();
if (stores != null) {
for (final StoreBuilder<?> store : stores) {
counter.wrapStateStore(processorName, store.name());
}
}
return stores;
}
@Override
public FixedKeyProcessor<KIn, VIn, VOut> get() {
return delegate.get();
} }
} }
} }