MINOR: Improve checks for CogroupedStreamAggregateBuilder (#9141)

Update `CogroupedStreamAggregateBuilder` to have individual builders depending
on the windowed aggregation, or lack thereof. This replaced passing in all options
into the builder, with all but the current type of aggregation set to null and then
checking to see which value was not null.

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, John Roesler <vvcephei@apache.org>
This commit is contained in:
leah 2020-08-10 17:32:44 -05:00 committed by GitHub
parent c8bccdd913
commit 3e6dcb14dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 121 additions and 80 deletions

View File

@ -129,9 +129,6 @@ public class CogroupedKStreamImpl<K, VOut> extends AbstractStream<K, VOut> imple
new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize(),
materializedInternal.keySerde(),
materializedInternal.valueSerde(),
materializedInternal.queryableStoreName(),
null,
null,
null);
materializedInternal.queryableStoreName());
}
}

View File

@ -47,18 +47,93 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
this.builder = builder;
}
<KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final Serde<KR> keySerde,
final Serde<VOut> valueSerde,
final String queryableName) {
processRepartitions(groupPatterns, storeBuilder);
final Collection<StreamsGraphNode> processors = new ArrayList<>();
boolean stateCreated = false;
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME),
stateCreated,
storeBuilder,
new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue()));
stateCreated = true;
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
}
return createTable(processors, named, keySerde, valueSerde, queryableName);
}
<KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final Serde<KR> keySerde,
final Serde<VOut> valueSerde,
final String queryableName,
final Windows<W> windows,
final SessionWindows sessionWindows,
final Merger<? super K, VOut> sessionMerger) {
<KR, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final Serde<KR> keySerde,
final Serde<VOut> valueSerde,
final String queryableName,
final Windows<W> windows) {
processRepartitions(groupPatterns, storeBuilder);
final Collection<StreamsGraphNode> processors = new ArrayList<>();
boolean stateCreated = false;
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME),
stateCreated,
storeBuilder,
new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, kGroupedStream.getValue()));
stateCreated = true;
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
}
return createTable(processors, named, keySerde, valueSerde, queryableName);
}
<KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final Serde<KR> keySerde,
final Serde<VOut> valueSerde,
final String queryableName,
final SessionWindows sessionWindows,
final Merger<? super K, VOut> sessionMerger) {
processRepartitions(groupPatterns, storeBuilder);
final Collection<StreamsGraphNode> processors = new ArrayList<>();
boolean stateCreated = false;
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME),
stateCreated,
storeBuilder,
new KStreamSessionWindowAggregate<>(sessionWindows, storeBuilder.name(), initializer, kGroupedStream.getValue(), sessionMerger));
stateCreated = true;
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
}
return createTable(processors, named, keySerde, valueSerde, queryableName);
}
private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final StoreBuilder<?> storeBuilder) {
for (final KGroupedStreamImpl<K, ?> repartitionReqs : groupPatterns.keySet()) {
if (repartitionReqs.repartitionRequired) {
@ -66,7 +141,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final OptimizableRepartitionNodeBuilder<K, ?> repartitionNodeBuilder = optimizableRepartitionNodeBuilder();
final String repartionNamePrefix = repartitionReqs.userProvidedRepartitionTopicName != null ?
repartitionReqs.userProvidedRepartitionTopicName : storeBuilder.name();
repartitionReqs.userProvidedRepartitionTopicName : storeBuilder.name();
createRepartitionSource(repartionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde);
@ -85,33 +160,21 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
groupedStreams.remove(kGrouped);
kGrouped.ensureCopartitionWith(groupedStreams);
final Collection<StreamsGraphNode> processors = new ArrayList<>();
boolean stateCreated = false;
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
kGroupedStream.getValue(),
initializer,
named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME),
stateCreated,
storeBuilder,
windows,
sessionWindows,
sessionMerger);
stateCreated = true;
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
}
}
<KR, VIn> KTable<KR, VOut> createTable(final Collection<StreamsGraphNode> processors,
final NamedInternal named,
final Serde<KR> keySerde,
final Serde<VOut> valueSerde,
final String queryableName) {
final String mergeProcessorName = named.suffixWithOrElseGet(
"-cogroup-merge",
builder,
CogroupedKStreamImpl.MERGE_NAME);
final ProcessorSupplier<K, VOut> passThrough = new PassThrough<>();
final ProcessorGraphNode<K, VOut> mergeNode =
new ProcessorGraphNode<>(mergeProcessorName, new ProcessorParameters<>(passThrough, mergeProcessorName));
new ProcessorGraphNode<>(mergeProcessorName, new ProcessorParameters<>(passThrough, mergeProcessorName));
builder.addGraphNode(processors, mergeNode);
@ -126,41 +189,24 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
builder);
}
private <W extends Window> StatefulProcessorNode<K, ?> getStatefulProcessorNode(final Aggregator<? super K, Object, VOut> aggregator,
final Initializer<VOut> initializer,
final String processorName,
final boolean stateCreated,
final StoreBuilder<?> storeBuilder,
final Windows<W> windows,
final SessionWindows sessionWindows,
final Merger<? super K, VOut> sessionMerger) {
final ProcessorSupplier<K, ?> kStreamAggregate;
if (windows == null && sessionWindows == null) {
kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), initializer, aggregator);
} else if (windows != null && sessionWindows == null) {
kStreamAggregate = new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, aggregator);
} else if (windows == null && sessionMerger != null) {
kStreamAggregate = new KStreamSessionWindowAggregate<>(sessionWindows, storeBuilder.name(), initializer, aggregator, sessionMerger);
} else {
throw new IllegalArgumentException("must include windows OR sessionWindows + sessionMerger OR all must be null");
}
private StatefulProcessorNode<K, ?> getStatefulProcessorNode(final String processorName,
final boolean stateCreated,
final StoreBuilder<?> storeBuilder,
final ProcessorSupplier<K, ?> kStreamAggregate) {
final StatefulProcessorNode<K, ?> statefulProcessorNode;
if (!stateCreated) {
statefulProcessorNode =
new StatefulProcessorNode<>(
processorName,
new ProcessorParameters<>(kStreamAggregate, processorName),
storeBuilder
processorName,
new ProcessorParameters<>(kStreamAggregate, processorName),
storeBuilder
);
} else {
statefulProcessorNode =
new StatefulProcessorNode<>(
processorName,
new ProcessorParameters<>(kStreamAggregate, processorName),
new String[]{storeBuilder.name()}
processorName,
new ProcessorParameters<>(kStreamAggregate, processorName),
new String[]{storeBuilder.name()}
);
}
@ -169,9 +215,9 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
@SuppressWarnings("unchecked")
private <VIn> void createRepartitionSource(final String repartitionTopicNamePrefix,
final OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder,
final Serde<K> keySerde,
final Serde<?> valueSerde) {
final OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder,
final Serde<K> keySerde,
final Serde<?> valueSerde) {
KStreamImpl.createRepartitionedSource(builder,
keySerde,

View File

@ -91,7 +91,8 @@ public class SessionWindowedCogroupedKStreamImpl<K, V> extends
materialized,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME);
return aggregateBuilder.build(groupPatterns,
return aggregateBuilder.build(
groupPatterns,
initializer,
new NamedInternal(named),
materialize(materializedInternal),
@ -101,7 +102,6 @@ public class SessionWindowedCogroupedKStreamImpl<K, V> extends
null,
materializedInternal.valueSerde(),
materializedInternal.queryableStoreName(),
null,
sessionWindows,
sessionMerger);
}

View File

@ -91,18 +91,16 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends Ab
builder,
CogroupedKStreamImpl.AGGREGATE_NAME);
return aggregateBuilder.build(
groupPatterns,
initializer,
new NamedInternal(named),
materialize(materializedInternal),
materializedInternal.keySerde() != null ?
new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size())
: null,
materializedInternal.valueSerde(),
materializedInternal.queryableStoreName(),
windows,
null,
null);
groupPatterns,
initializer,
new NamedInternal(named),
materialize(materializedInternal),
materializedInternal.keySerde() != null ?
new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size())
: null,
materializedInternal.valueSerde(),
materializedInternal.queryableStoreName(),
windows);
}
@SuppressWarnings("deprecation")