From 3e6dcb14dd9716f07d1ba96b1b06b78e55e92de1 Mon Sep 17 00:00:00 2001 From: leah Date: Mon, 10 Aug 2020 17:32:44 -0500 Subject: [PATCH] MINOR: Improve checks for CogroupedStreamAggregateBuilder (#9141) Update `CogroupedStreamAggregateBuilder` to have individual builders depending on the windowed aggregation, or lack thereof. This replaced passing in all options into the builder, with all but the current type of aggregation set to null and then checking to see which value was not null. Reviewers: A. Sophie Blee-Goldman , John Roesler --- .../internals/CogroupedKStreamImpl.java | 5 +- .../CogroupedStreamAggregateBuilder.java | 170 +++++++++++------- .../SessionWindowedCogroupedKStreamImpl.java | 4 +- .../TimeWindowedCogroupedKStreamImpl.java | 22 ++- 4 files changed, 121 insertions(+), 80 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java index 41ddcddb037..86ef133d048 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java @@ -129,9 +129,6 @@ public class CogroupedKStreamImpl extends AbstractStream imple new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize(), materializedInternal.keySerde(), materializedInternal.valueSerde(), - materializedInternal.queryableStoreName(), - null, - null, - null); + materializedInternal.queryableStoreName()); } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java index ddad482292d..15051ddfaa3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java @@ -47,18 +47,93 @@ class CogroupedStreamAggregateBuilder { CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) { this.builder = builder; } + KTable build(final Map, Aggregator> groupPatterns, + final Initializer initializer, + final NamedInternal named, + final StoreBuilder storeBuilder, + final Serde keySerde, + final Serde valueSerde, + final String queryableName) { + processRepartitions(groupPatterns, storeBuilder); + final Collection processors = new ArrayList<>(); + boolean stateCreated = false; + int counter = 0; + for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) { + final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode( + named.suffixWithOrElseGet( + "-cogroup-agg-" + counter++, + builder, + CogroupedKStreamImpl.AGGREGATE_NAME), + stateCreated, + storeBuilder, + new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue())); + stateCreated = true; + processors.add(statefulProcessorNode); + builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); + } + return createTable(processors, named, keySerde, valueSerde, queryableName); + } - KTable build(final Map, Aggregator> groupPatterns, - final Initializer initializer, - final NamedInternal named, - final StoreBuilder storeBuilder, - final Serde keySerde, - final Serde valueSerde, - final String queryableName, - final Windows windows, - final SessionWindows sessionWindows, - final Merger sessionMerger) { + KTable build(final Map, Aggregator> groupPatterns, + final Initializer initializer, + final NamedInternal named, + final StoreBuilder storeBuilder, + final Serde keySerde, + final Serde valueSerde, + final String queryableName, + final Windows windows) { + processRepartitions(groupPatterns, storeBuilder); + final Collection processors = new ArrayList<>(); + boolean stateCreated = false; + int counter = 0; + for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) { + final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode( + named.suffixWithOrElseGet( + "-cogroup-agg-" + counter++, + builder, + CogroupedKStreamImpl.AGGREGATE_NAME), + stateCreated, + storeBuilder, + new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, kGroupedStream.getValue())); + stateCreated = true; + processors.add(statefulProcessorNode); + builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); + } + return createTable(processors, named, keySerde, valueSerde, queryableName); + } + + KTable build(final Map, Aggregator> groupPatterns, + final Initializer initializer, + final NamedInternal named, + final StoreBuilder storeBuilder, + final Serde keySerde, + final Serde valueSerde, + final String queryableName, + final SessionWindows sessionWindows, + final Merger sessionMerger) { + processRepartitions(groupPatterns, storeBuilder); + final Collection processors = new ArrayList<>(); + boolean stateCreated = false; + int counter = 0; + for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) { + final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode( + named.suffixWithOrElseGet( + "-cogroup-agg-" + counter++, + builder, + CogroupedKStreamImpl.AGGREGATE_NAME), + stateCreated, + storeBuilder, + new KStreamSessionWindowAggregate<>(sessionWindows, storeBuilder.name(), initializer, kGroupedStream.getValue(), sessionMerger)); + stateCreated = true; + processors.add(statefulProcessorNode); + builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); + } + return createTable(processors, named, keySerde, valueSerde, queryableName); + } + + private void processRepartitions(final Map, Aggregator> groupPatterns, + final StoreBuilder storeBuilder) { for (final KGroupedStreamImpl repartitionReqs : groupPatterns.keySet()) { if (repartitionReqs.repartitionRequired) { @@ -66,7 +141,7 @@ class CogroupedStreamAggregateBuilder { final OptimizableRepartitionNodeBuilder repartitionNodeBuilder = optimizableRepartitionNodeBuilder(); final String repartionNamePrefix = repartitionReqs.userProvidedRepartitionTopicName != null ? - repartitionReqs.userProvidedRepartitionTopicName : storeBuilder.name(); + repartitionReqs.userProvidedRepartitionTopicName : storeBuilder.name(); createRepartitionSource(repartionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde); @@ -85,33 +160,21 @@ class CogroupedStreamAggregateBuilder { groupedStreams.remove(kGrouped); kGrouped.ensureCopartitionWith(groupedStreams); - final Collection processors = new ArrayList<>(); - boolean stateCreated = false; - int counter = 0; - for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) { - final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode( - kGroupedStream.getValue(), - initializer, - named.suffixWithOrElseGet( - "-cogroup-agg-" + counter++, - builder, - CogroupedKStreamImpl.AGGREGATE_NAME), - stateCreated, - storeBuilder, - windows, - sessionWindows, - sessionMerger); - stateCreated = true; - processors.add(statefulProcessorNode); - builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); - } + } + + KTable createTable(final Collection processors, + final NamedInternal named, + final Serde keySerde, + final Serde valueSerde, + final String queryableName) { + final String mergeProcessorName = named.suffixWithOrElseGet( "-cogroup-merge", builder, CogroupedKStreamImpl.MERGE_NAME); final ProcessorSupplier passThrough = new PassThrough<>(); final ProcessorGraphNode mergeNode = - new ProcessorGraphNode<>(mergeProcessorName, new ProcessorParameters<>(passThrough, mergeProcessorName)); + new ProcessorGraphNode<>(mergeProcessorName, new ProcessorParameters<>(passThrough, mergeProcessorName)); builder.addGraphNode(processors, mergeNode); @@ -126,41 +189,24 @@ class CogroupedStreamAggregateBuilder { builder); } - private StatefulProcessorNode getStatefulProcessorNode(final Aggregator aggregator, - final Initializer initializer, - final String processorName, - final boolean stateCreated, - final StoreBuilder storeBuilder, - final Windows windows, - final SessionWindows sessionWindows, - final Merger sessionMerger) { - - final ProcessorSupplier kStreamAggregate; - - if (windows == null && sessionWindows == null) { - kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), initializer, aggregator); - } else if (windows != null && sessionWindows == null) { - kStreamAggregate = new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, aggregator); - } else if (windows == null && sessionMerger != null) { - kStreamAggregate = new KStreamSessionWindowAggregate<>(sessionWindows, storeBuilder.name(), initializer, aggregator, sessionMerger); - } else { - throw new IllegalArgumentException("must include windows OR sessionWindows + sessionMerger OR all must be null"); - } - + private StatefulProcessorNode getStatefulProcessorNode(final String processorName, + final boolean stateCreated, + final StoreBuilder storeBuilder, + final ProcessorSupplier kStreamAggregate) { final StatefulProcessorNode statefulProcessorNode; if (!stateCreated) { statefulProcessorNode = new StatefulProcessorNode<>( - processorName, - new ProcessorParameters<>(kStreamAggregate, processorName), - storeBuilder + processorName, + new ProcessorParameters<>(kStreamAggregate, processorName), + storeBuilder ); } else { statefulProcessorNode = new StatefulProcessorNode<>( - processorName, - new ProcessorParameters<>(kStreamAggregate, processorName), - new String[]{storeBuilder.name()} + processorName, + new ProcessorParameters<>(kStreamAggregate, processorName), + new String[]{storeBuilder.name()} ); } @@ -169,9 +215,9 @@ class CogroupedStreamAggregateBuilder { @SuppressWarnings("unchecked") private void createRepartitionSource(final String repartitionTopicNamePrefix, - final OptimizableRepartitionNodeBuilder optimizableRepartitionNodeBuilder, - final Serde keySerde, - final Serde valueSerde) { + final OptimizableRepartitionNodeBuilder optimizableRepartitionNodeBuilder, + final Serde keySerde, + final Serde valueSerde) { KStreamImpl.createRepartitionedSource(builder, keySerde, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java index f755a743f9a..20a1b086503 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java @@ -91,7 +91,8 @@ public class SessionWindowedCogroupedKStreamImpl extends materialized, builder, CogroupedKStreamImpl.AGGREGATE_NAME); - return aggregateBuilder.build(groupPatterns, + return aggregateBuilder.build( + groupPatterns, initializer, new NamedInternal(named), materialize(materializedInternal), @@ -101,7 +102,6 @@ public class SessionWindowedCogroupedKStreamImpl extends null, materializedInternal.valueSerde(), materializedInternal.queryableStoreName(), - null, sessionWindows, sessionMerger); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java index 5cb76783a48..f6169bda832 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java @@ -91,18 +91,16 @@ public class TimeWindowedCogroupedKStreamImpl extends Ab builder, CogroupedKStreamImpl.AGGREGATE_NAME); return aggregateBuilder.build( - groupPatterns, - initializer, - new NamedInternal(named), - materialize(materializedInternal), - materializedInternal.keySerde() != null ? - new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) - : null, - materializedInternal.valueSerde(), - materializedInternal.queryableStoreName(), - windows, - null, - null); + groupPatterns, + initializer, + new NamedInternal(named), + materialize(materializedInternal), + materializedInternal.keySerde() != null ? + new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) + : null, + materializedInternal.valueSerde(), + materializedInternal.queryableStoreName(), + windows); } @SuppressWarnings("deprecation")