MINOR: Align KTableAgg and KTableReduce (#6712)

Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Jeff Kim <kimkb2011@gmail.com>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
Matthias J. Sax 2019-05-11 11:54:58 +02:00
parent b5b2c5af2f
commit 4ed5efe7ba
3 changed files with 33 additions and 22 deletions

View File

@ -59,7 +59,6 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
store = (KeyValueStore<K, V>) context.getStateStore(storeName);
tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues);
}

View File

@ -75,22 +75,29 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
throw new StreamsException("Record key for KTable aggregate operator with state " + storeName + " should not be null.");
}
T oldAgg = store.get(key);
if (oldAgg == null) {
oldAgg = initializer.apply();
}
T newAgg = oldAgg;
final T oldAgg = store.get(key);
final T intermediateAgg;
// first try to remove the old value
if (value.oldValue != null) {
newAgg = remove.apply(key, value.oldValue, newAgg);
if (value.oldValue != null && oldAgg != null) {
intermediateAgg = remove.apply(key, value.oldValue, oldAgg);
} else {
intermediateAgg = oldAgg;
}
// then try to add the new value
final T newAgg;
if (value.newValue != null) {
newAgg = add.apply(key, value.newValue, newAgg);
final T initializedAgg;
if (intermediateAgg == null) {
initializedAgg = initializer.apply();
} else {
initializedAgg = intermediateAgg;
}
newAgg = add.apply(key, value.newValue, initializedAgg);
} else {
newAgg = intermediateAgg;
}
// update the store with the new value

View File

@ -71,20 +71,25 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
}
final V oldAgg = store.get(key);
V newAgg = oldAgg;
final V intermediateAgg;
// first try to add the new value
if (value.newValue != null) {
if (newAgg == null) {
newAgg = value.newValue;
} else {
newAgg = addReducer.apply(newAgg, value.newValue);
}
// first try to remove the old value
if (value.oldValue != null && oldAgg != null) {
intermediateAgg = removeReducer.apply(oldAgg, value.oldValue);
} else {
intermediateAgg = oldAgg;
}
// then try to remove the old value
if (value.oldValue != null) {
newAgg = removeReducer.apply(newAgg, value.oldValue);
// then try to add the new value
final V newAgg;
if (value.newValue != null) {
if (intermediateAgg == null) {
newAgg = value.newValue;
} else {
newAgg = addReducer.apply(intermediateAgg, value.newValue);
}
} else {
newAgg = intermediateAgg;
}
// update the store with the new value