mirror of https://github.com/apache/kafka.git
KAFKA-13785: [6/N][Emit final] Copy: Emit final for TimeWindowedKStreamImpl (#12100)
This is a copy PR of #11896, authored by @lihaosky (Hao Li): Initial implementation to emit final for TimeWindowedKStreamImpl. This PR is on top of #12030 Author: Hao Li Reviewers: John Roesler <vvcephei@apache.org>
This commit is contained in:
parent
3d087244d8
commit
cc2aa96ae4
|
|
@ -1090,6 +1090,9 @@ public class StreamsConfig extends AbstractConfig {
|
|||
// Private API used to control the emit latency for left/outer join results (https://issues.apache.org/jira/browse/KAFKA-10847)
|
||||
public static final String EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "__emit.interval.ms.kstreams.outer.join.spurious.results.fix__";
|
||||
|
||||
// Private API used to control the emit latency for windowed aggregation results for ON_WINDOW_CLOSE emit strategy
|
||||
public static final String EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION = "__emit.interval.ms.kstreams.windowed.aggregation__";
|
||||
|
||||
// Private API used to control the usage of consistency offset vectors
|
||||
public static final String IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED = "__iq.consistency.offset"
|
||||
+ ".vector.enabled__";
|
||||
|
|
|
|||
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.kstream;
|
||||
|
||||
import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
|
||||
import org.apache.kafka.streams.kstream.internals.emitstrategy.WindowCloseStrategy;
|
||||
import org.apache.kafka.streams.kstream.internals.emitstrategy.WindowUpdateStrategy;
|
||||
|
||||
/**
|
||||
* This interface controls the strategy that can be used to control how we emit results in a processor.
|
||||
*/
|
||||
public interface EmitStrategy {
|
||||
|
||||
enum StrategyType {
|
||||
ON_WINDOW_CLOSE,
|
||||
ON_WINDOW_UPDATE
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the strategy type
|
||||
* @return Emit strategy type
|
||||
*/
|
||||
StrategyType type();
|
||||
|
||||
/**
|
||||
* This strategy indicates that the aggregated result for a window will only be emitted when the
|
||||
* window closes instead of when there's an update to the window. Window close means that current
|
||||
* event time is larger than (window end time + grace period).
|
||||
*
|
||||
* <p>This strategy should only be used for windows which can close. An exception will be thrown
|
||||
* if it's used with {@link UnlimitedWindow}.
|
||||
*
|
||||
* @see TimeWindows
|
||||
* @see SlidingWindows
|
||||
* @see SessionWindows
|
||||
* @see UnlimitedWindows
|
||||
* @see WindowUpdateStrategy
|
||||
*
|
||||
* @return WindowCloseStrategy instance
|
||||
*/
|
||||
static EmitStrategy onWindowClose() {
|
||||
return new WindowCloseStrategy();
|
||||
}
|
||||
|
||||
/**
|
||||
* This strategy indicates that the aggregated result for a window will be emitted every time
|
||||
* when there's an update to the window instead of when the window closes.
|
||||
*
|
||||
* @see TimeWindows
|
||||
* @see SlidingWindows
|
||||
* @see SessionWindows
|
||||
* @see UnlimitedWindows
|
||||
* @see WindowCloseStrategy
|
||||
*
|
||||
* @return WindowCloseStrategy instance
|
||||
*/
|
||||
static EmitStrategy onWindowUpdate() {
|
||||
return new WindowUpdateStrategy();
|
||||
}
|
||||
}
|
||||
|
|
@ -18,11 +18,18 @@ package org.apache.kafka.streams.kstream.internals;
|
|||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.kstream.Aggregator;
|
||||
import org.apache.kafka.streams.kstream.EmitStrategy;
|
||||
import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
|
||||
import org.apache.kafka.streams.kstream.Initializer;
|
||||
import org.apache.kafka.streams.kstream.TimeWindows;
|
||||
import org.apache.kafka.streams.kstream.Window;
|
||||
import org.apache.kafka.streams.kstream.Windowed;
|
||||
import org.apache.kafka.streams.kstream.Windows;
|
||||
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
|
||||
import org.apache.kafka.streams.processor.api.ContextualProcessor;
|
||||
import org.apache.kafka.streams.processor.api.Processor;
|
||||
import org.apache.kafka.streams.processor.api.ProcessorContext;
|
||||
|
|
@ -30,6 +37,7 @@ import org.apache.kafka.streams.processor.api.Record;
|
|||
import org.apache.kafka.streams.processor.api.RecordMetadata;
|
||||
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.state.KeyValueIterator;
|
||||
import org.apache.kafka.streams.state.TimestampedWindowStore;
|
||||
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -37,6 +45,9 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
|
||||
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
|
||||
|
||||
|
|
@ -48,6 +59,7 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends Window> implements
|
|||
private final Windows<W> windows;
|
||||
private final Initializer<VAgg> initializer;
|
||||
private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
|
||||
private final EmitStrategy emitStrategy;
|
||||
|
||||
private boolean sendOldValues = false;
|
||||
|
||||
|
|
@ -55,10 +67,26 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends Window> implements
|
|||
final String storeName,
|
||||
final Initializer<VAgg> initializer,
|
||||
final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) {
|
||||
this(windows, storeName, EmitStrategy.onWindowUpdate(), initializer, aggregator);
|
||||
}
|
||||
|
||||
public KStreamWindowAggregate(final Windows<W> windows,
|
||||
final String storeName,
|
||||
final EmitStrategy emitStrategy,
|
||||
final Initializer<VAgg> initializer,
|
||||
final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) {
|
||||
this.windows = windows;
|
||||
this.storeName = storeName;
|
||||
this.emitStrategy = emitStrategy;
|
||||
this.initializer = initializer;
|
||||
this.aggregator = aggregator;
|
||||
|
||||
if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
|
||||
if (!(windows instanceof TimeWindows)) {
|
||||
throw new IllegalArgumentException("ON_WINDOW_CLOSE strategy is only supported for "
|
||||
+ "TimeWindows and SlidingWindows for TimeWindowedKStream");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -80,22 +108,54 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends Window> implements
|
|||
private TimestampedWindowStore<KIn, VAgg> windowStore;
|
||||
private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
|
||||
private Sensor droppedRecordsSensor;
|
||||
private Sensor emittedRecordsSensor;
|
||||
private Sensor emitFinalLatencySensor;
|
||||
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
|
||||
private long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP;
|
||||
private InternalProcessorContext<Windowed<KIn>, Change<VAgg>> internalProcessorContext;
|
||||
private final TimeTracker timeTracker = new TimeTracker();
|
||||
private final Time time = Time.SYSTEM;
|
||||
|
||||
@Override
|
||||
public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> context) {
|
||||
super.init(context);
|
||||
final InternalProcessorContext<Windowed<KIn>, Change<VAgg>> internalProcessorContext =
|
||||
(InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context;
|
||||
internalProcessorContext = (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context;
|
||||
final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
|
||||
final String threadId = Thread.currentThread().getName();
|
||||
droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics);
|
||||
emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(),
|
||||
internalProcessorContext.currentNode().name(), metrics);
|
||||
emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(),
|
||||
internalProcessorContext.currentNode().name(), metrics);
|
||||
windowStore = context.getStateStore(storeName);
|
||||
tupleForwarder = new TimestampedTupleForwarder<>(
|
||||
windowStore,
|
||||
context,
|
||||
new TimestampedCacheFlushListener<>(context),
|
||||
sendOldValues);
|
||||
|
||||
if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
|
||||
// Don't set flush lister which emit cache results
|
||||
tupleForwarder = new TimestampedTupleForwarder<>(
|
||||
windowStore,
|
||||
context,
|
||||
sendOldValues);
|
||||
} else {
|
||||
tupleForwarder = new TimestampedTupleForwarder<>(
|
||||
windowStore,
|
||||
context,
|
||||
new TimestampedCacheFlushListener<>(context),
|
||||
sendOldValues);
|
||||
}
|
||||
|
||||
// Restore last emit close time for ON_WINDOW_CLOSE strategy
|
||||
if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
|
||||
final Long lastEmitTime = internalProcessorContext.processorMetadataForKey(storeName);
|
||||
if (lastEmitTime != null) {
|
||||
lastEmitWindowCloseTime = lastEmitTime;
|
||||
}
|
||||
final long emitInterval = StreamsConfig.InternalConfig.getLong(
|
||||
context.appConfigs(),
|
||||
EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
|
||||
1000L
|
||||
);
|
||||
timeTracker.setEmitInterval(emitInterval);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -120,15 +180,16 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends Window> implements
|
|||
// first get the matching windows
|
||||
final long timestamp = record.timestamp();
|
||||
observedStreamTime = Math.max(observedStreamTime, timestamp);
|
||||
final long closeTime = observedStreamTime - windows.gracePeriodMs();
|
||||
final long windowCloseTime = observedStreamTime - windows.gracePeriodMs();
|
||||
|
||||
final Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
|
||||
|
||||
// try update the window, and create the new window for the rest of unmatched window that do not exist yet
|
||||
// try update the window whose end time is still larger than the window close time,
|
||||
// and create the new window for the rest of unmatched window that do not exist yet;
|
||||
for (final Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
|
||||
final Long windowStart = entry.getKey();
|
||||
final long windowEnd = entry.getValue().end();
|
||||
if (windowEnd > closeTime) {
|
||||
if (windowEnd > windowCloseTime) {
|
||||
final ValueAndTimestamp<VAgg> oldAggAndTimestamp = windowStore.fetch(record.key(), windowStart);
|
||||
VAgg oldAgg = getValueOrNull(oldAggAndTimestamp);
|
||||
|
||||
|
|
@ -146,10 +207,12 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends Window> implements
|
|||
|
||||
// update the store with the new value
|
||||
windowStore.put(record.key(), ValueAndTimestamp.make(newAgg, newTimestamp), windowStart);
|
||||
tupleForwarder.maybeForward(
|
||||
record.withKey(new Windowed<>(record.key(), entry.getValue()))
|
||||
.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : null))
|
||||
.withTimestamp(newTimestamp));
|
||||
if (emitStrategy.type() == StrategyType.ON_WINDOW_UPDATE) {
|
||||
tupleForwarder.maybeForward(
|
||||
record.withKey(new Windowed<>(record.key(), entry.getValue()))
|
||||
.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : null))
|
||||
.withTimestamp(newTimestamp));
|
||||
}
|
||||
} else {
|
||||
if (context().recordMetadata().isPresent()) {
|
||||
final RecordMetadata recordMetadata = context().recordMetadata().get();
|
||||
|
|
@ -165,7 +228,7 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends Window> implements
|
|||
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(),
|
||||
record.timestamp(),
|
||||
windowStart, windowEnd,
|
||||
closeTime,
|
||||
windowCloseTime,
|
||||
observedStreamTime
|
||||
);
|
||||
} else {
|
||||
|
|
@ -177,13 +240,88 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends Window> implements
|
|||
"streamTime=[{}]",
|
||||
record.timestamp(),
|
||||
windowStart, windowEnd,
|
||||
closeTime,
|
||||
windowCloseTime,
|
||||
observedStreamTime
|
||||
);
|
||||
}
|
||||
droppedRecordsSensor.record();
|
||||
}
|
||||
}
|
||||
|
||||
tryEmitFinalResult(record, windowCloseTime);
|
||||
}
|
||||
|
||||
private void tryEmitFinalResult(final Record<KIn, VIn> record, final long windowCloseTime) {
|
||||
if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
|
||||
return;
|
||||
}
|
||||
|
||||
final long now = internalProcessorContext.currentSystemTimeMs();
|
||||
// Throttle emit frequency as an optimization, the tradeoff is that we need to remember the
|
||||
// window close time when we emitted last time so that we can restart from there in the next emit
|
||||
if (now < timeTracker.nextTimeToEmit) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Schedule next emit time based on now to avoid the case that if system time jumps a lot,
|
||||
// this can be triggered every time
|
||||
timeTracker.nextTimeToEmit = now;
|
||||
timeTracker.advanceNextTimeToEmit();
|
||||
|
||||
// Window close time has not progressed, there will be no windows to close hence no records to emit
|
||||
if (lastEmitWindowCloseTime != ConsumerRecord.NO_TIMESTAMP && lastEmitWindowCloseTime >= windowCloseTime) {
|
||||
return;
|
||||
}
|
||||
|
||||
final long emitRangeUpperBoundInclusive = windowCloseTime - windows.size();
|
||||
// No window has ever closed and hence no need to emit any records
|
||||
if (emitRangeUpperBoundInclusive < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// Set emitRangeLowerBoundInclusive to -1L if lastEmitWindowCloseTime was not set so that
|
||||
// we would fetch from 0L for the first time; otherwise set it to lastEmitWindowCloseTime - windows.size().
|
||||
//
|
||||
// Note if we get here, it means emitRangeUpperBoundInclusive > 0, which means windowCloseTime > windows.size(),
|
||||
// Because we always set lastEmitWindowCloseTime to windowCloseTime before, it means
|
||||
// lastEmitWindowCloseTime - windows.size() should always > 0
|
||||
// As a result, emitRangeLowerBoundInclusive is always >= 0
|
||||
final long emitRangeLowerBoundInclusive = lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP ?
|
||||
-1L : lastEmitWindowCloseTime - windows.size();
|
||||
|
||||
if (lastEmitWindowCloseTime != ConsumerRecord.NO_TIMESTAMP) {
|
||||
final Map<Long, W> matchedCloseWindows = windows.windowsFor(emitRangeUpperBoundInclusive);
|
||||
final Map<Long, W> matchedEmitWindows = windows.windowsFor(emitRangeLowerBoundInclusive);
|
||||
|
||||
// Don't fetch store if there is no new stores that are closed since the last time we emitted
|
||||
if (matchedCloseWindows.equals(matchedEmitWindows)) {
|
||||
log.trace("no new windows to emit. LastEmitCloseTime={}, newCloseTime={}",
|
||||
lastEmitWindowCloseTime, windowCloseTime);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
final long startMs = time.milliseconds();
|
||||
|
||||
final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>> windowToEmit = windowStore
|
||||
.fetchAll(emitRangeLowerBoundInclusive + 1, emitRangeUpperBoundInclusive);
|
||||
|
||||
int emittedCount = 0;
|
||||
while (windowToEmit.hasNext()) {
|
||||
emittedCount++;
|
||||
final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> kv = windowToEmit.next();
|
||||
tupleForwarder.maybeForward(
|
||||
record.withKey(kv.key)
|
||||
.withValue(new Change<>(kv.value.value(), null))
|
||||
.withTimestamp(kv.value.timestamp())
|
||||
.withHeaders(record.headers()));
|
||||
}
|
||||
emittedRecordsSensor.record(emittedCount);
|
||||
emitFinalLatencySensor.record(time.milliseconds() - startMs);
|
||||
|
||||
lastEmitWindowCloseTime = windowCloseTime;
|
||||
internalProcessorContext.addProcessorMetadataKeyValue(storeName, windowCloseTime);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,12 +20,15 @@ import org.apache.kafka.common.serialization.Serde;
|
|||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.apache.kafka.streams.kstream.Aggregator;
|
||||
import org.apache.kafka.streams.kstream.EmitStrategy;
|
||||
import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
|
||||
import org.apache.kafka.streams.kstream.Initializer;
|
||||
import org.apache.kafka.streams.kstream.KTable;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Named;
|
||||
import org.apache.kafka.streams.kstream.Reducer;
|
||||
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
|
||||
import org.apache.kafka.streams.kstream.UnlimitedWindows;
|
||||
import org.apache.kafka.streams.kstream.Window;
|
||||
import org.apache.kafka.streams.kstream.Windowed;
|
||||
import org.apache.kafka.streams.kstream.Windows;
|
||||
|
|
@ -39,6 +42,7 @@ import org.apache.kafka.streams.state.WindowStore;
|
|||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
|
||||
|
||||
import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
|
||||
import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
|
||||
|
|
@ -47,6 +51,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
|
|||
|
||||
private final Windows<W> windows;
|
||||
private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
|
||||
private EmitStrategy emitStrategy = EmitStrategy.onWindowUpdate();
|
||||
|
||||
TimeWindowedKStreamImpl(final Windows<W> windows,
|
||||
final InternalStreamsBuilder builder,
|
||||
|
|
@ -107,7 +112,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
|
|||
return aggregateBuilder.build(
|
||||
new NamedInternal(aggregateName),
|
||||
materialize(materializedInternal),
|
||||
new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
|
||||
new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
|
||||
materializedInternal.queryableStoreName(),
|
||||
materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
|
||||
materializedInternal.valueSerde());
|
||||
|
|
@ -155,7 +160,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
|
|||
return aggregateBuilder.build(
|
||||
new NamedInternal(aggregateName),
|
||||
materialize(materializedInternal),
|
||||
new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
|
||||
new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, initializer, aggregator),
|
||||
materializedInternal.queryableStoreName(),
|
||||
materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
|
||||
materializedInternal.valueSerde());
|
||||
|
|
@ -202,12 +207,22 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
|
|||
return aggregateBuilder.build(
|
||||
new NamedInternal(reduceName),
|
||||
materialize(materializedInternal),
|
||||
new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
|
||||
new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
|
||||
materializedInternal.queryableStoreName(),
|
||||
materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
|
||||
materializedInternal.valueSerde());
|
||||
}
|
||||
|
||||
//@Override
|
||||
public TimeWindowedKStream<K, V> emitStrategy(final EmitStrategy emitStrategy) {
|
||||
if (this.windows instanceof UnlimitedWindows
|
||||
&& emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
|
||||
throw new IllegalArgumentException("ON_WINDOW_CLOSE emit strategy cannot be used for UnlimitedWindows");
|
||||
}
|
||||
this.emitStrategy = emitStrategy;
|
||||
return this;
|
||||
}
|
||||
|
||||
private <VR> StoreBuilder<TimestampedWindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
|
||||
WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
|
||||
if (supplier == null) {
|
||||
|
|
@ -232,11 +247,19 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
|
|||
);
|
||||
break;
|
||||
case ROCKS_DB:
|
||||
supplier = Stores.persistentTimestampedWindowStore(
|
||||
materialized.storeName(),
|
||||
Duration.ofMillis(retentionPeriod),
|
||||
Duration.ofMillis(windows.size()),
|
||||
false
|
||||
supplier = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE ?
|
||||
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
|
||||
materialized.storeName(),
|
||||
Duration.ofMillis(retentionPeriod),
|
||||
Duration.ofMillis(windows.size()),
|
||||
false,
|
||||
false
|
||||
) :
|
||||
Stores.persistentTimestampedWindowStore(
|
||||
materialized.storeName(),
|
||||
Duration.ofMillis(retentionPeriod),
|
||||
Duration.ofMillis(windows.size()),
|
||||
false
|
||||
);
|
||||
break;
|
||||
default:
|
||||
|
|
|
|||
|
|
@ -45,6 +45,14 @@ class TimestampedTupleForwarder<K, V> {
|
|||
cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
|
||||
}
|
||||
|
||||
TimestampedTupleForwarder(final StateStore store,
|
||||
final ProcessorContext<K, Change<V>> context,
|
||||
final boolean sendOldValues) {
|
||||
this.context = (InternalProcessorContext<K, Change<V>>) context;
|
||||
this.sendOldValues = sendOldValues;
|
||||
cachingEnabled = false;
|
||||
}
|
||||
|
||||
public void maybeForward(final Record<K, Change<V>> record) {
|
||||
if (!cachingEnabled) {
|
||||
if (sendOldValues) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.kstream.internals.emitstrategy;
|
||||
|
||||
import org.apache.kafka.streams.kstream.EmitStrategy;
|
||||
|
||||
/**
|
||||
* An emit strategy which indicates only output when a window closes.
|
||||
*/
|
||||
public class WindowCloseStrategy implements EmitStrategy {
|
||||
|
||||
@Override
|
||||
public StrategyType type() {
|
||||
return StrategyType.ON_WINDOW_CLOSE;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.kstream.internals.emitstrategy;
|
||||
|
||||
import org.apache.kafka.streams.kstream.EmitStrategy;
|
||||
|
||||
/**
|
||||
* An emit strategy which indicates output everytime when a window gets an update.
|
||||
*/
|
||||
public class WindowUpdateStrategy implements EmitStrategy {
|
||||
|
||||
@Override
|
||||
public StrategyType type() {
|
||||
return StrategyType.ON_WINDOW_UPDATE;
|
||||
}
|
||||
}
|
||||
|
|
@ -21,6 +21,9 @@ import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.AVG_LATENCY_DESCRIPTION;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.MAX_LATENCY_DESCRIPTION;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RECORD_E2E_LATENCY;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RECORD_E2E_LATENCY_AVG_DESCRIPTION;
|
||||
|
|
@ -31,6 +34,8 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
|
|||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMinAndMaxToSensor;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor;
|
||||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
|
||||
|
||||
public class ProcessorNodeMetrics {
|
||||
private ProcessorNodeMetrics() {}
|
||||
|
|
@ -62,6 +67,17 @@ public class ProcessorNodeMetrics {
|
|||
private static final String FORWARD_RATE_DESCRIPTION =
|
||||
RATE_DESCRIPTION_PREFIX + FORWARD_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
|
||||
|
||||
private static final String EMITTED_RECORDS = "window-aggregate-final-emit";
|
||||
private static final String EMITTED_RECORDS_DESCRIPTION = "emit final records";
|
||||
private static final String EMITTED_RECORDS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + EMITTED_RECORDS_DESCRIPTION;
|
||||
private static final String EMITTED_RECORDS_RATE_DESCRIPTION =
|
||||
RATE_DESCRIPTION_PREFIX + EMITTED_RECORDS_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
|
||||
|
||||
private static final String EMIT_FINAL_LATENCY = EMITTED_RECORDS + LATENCY_SUFFIX;
|
||||
private static final String EMIT_FINAL_DESCRIPTION = "calls to emit final";
|
||||
private static final String EMIT_FINAL_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION + EMIT_FINAL_DESCRIPTION;
|
||||
private static final String EMIT_FINAL_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION + EMIT_FINAL_DESCRIPTION;
|
||||
|
||||
public static Sensor suppressionEmitSensor(final String threadId,
|
||||
final String taskId,
|
||||
final String processorNodeId,
|
||||
|
|
@ -165,6 +181,42 @@ public class ProcessorNodeMetrics {
|
|||
return sensor;
|
||||
}
|
||||
|
||||
public static Sensor emitFinalLatencySensor(final String threadId,
|
||||
final String taskId,
|
||||
final String processorNodeId,
|
||||
final StreamsMetricsImpl streamsMetrics) {
|
||||
final String sensorName = processorNodeId + "-" + EMIT_FINAL_LATENCY;
|
||||
final Sensor sensor = streamsMetrics.nodeLevelSensor(threadId, taskId, processorNodeId, sensorName, RecordingLevel.DEBUG);
|
||||
final Map<String, String> tagMap = streamsMetrics.nodeLevelTagMap(threadId, taskId, processorNodeId);
|
||||
addAvgAndMaxToSensor(
|
||||
sensor,
|
||||
PROCESSOR_NODE_LEVEL_GROUP,
|
||||
tagMap,
|
||||
EMIT_FINAL_LATENCY,
|
||||
EMIT_FINAL_AVG_LATENCY_DESCRIPTION,
|
||||
EMIT_FINAL_MAX_LATENCY_DESCRIPTION
|
||||
);
|
||||
return sensor;
|
||||
}
|
||||
|
||||
public static Sensor emittedRecordsSensor(final String threadId,
|
||||
final String taskId,
|
||||
final String processorNodeId,
|
||||
final StreamsMetricsImpl streamsMetrics) {
|
||||
final String sensorName = processorNodeId + "-" + EMITTED_RECORDS;
|
||||
final Sensor sensor = streamsMetrics.nodeLevelSensor(threadId, taskId, processorNodeId, sensorName, RecordingLevel.DEBUG);
|
||||
final Map<String, String> tagMap = streamsMetrics.nodeLevelTagMap(threadId, taskId, processorNodeId);
|
||||
addRateOfSumAndSumMetricsToSensor(
|
||||
sensor,
|
||||
PROCESSOR_NODE_LEVEL_GROUP,
|
||||
tagMap,
|
||||
EMITTED_RECORDS,
|
||||
EMITTED_RECORDS_RATE_DESCRIPTION,
|
||||
EMITTED_RECORDS_TOTAL_DESCRIPTION
|
||||
);
|
||||
return sensor;
|
||||
}
|
||||
|
||||
private static Sensor throughputParentSensor(final String threadId,
|
||||
final String taskId,
|
||||
final String metricNamePrefix,
|
||||
|
|
@ -207,4 +259,6 @@ public class ProcessorNodeMetrics {
|
|||
);
|
||||
return sensor;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,10 @@ import org.apache.kafka.common.utils.Bytes;
|
|||
import org.apache.kafka.streams.kstream.Windowed;
|
||||
import org.apache.kafka.streams.processor.StateStore;
|
||||
import org.apache.kafka.streams.processor.StateStoreContext;
|
||||
import org.apache.kafka.streams.query.PositionBound;
|
||||
import org.apache.kafka.streams.query.Query;
|
||||
import org.apache.kafka.streams.query.QueryConfig;
|
||||
import org.apache.kafka.streams.query.QueryResult;
|
||||
import org.apache.kafka.streams.state.KeyValueIterator;
|
||||
import org.apache.kafka.streams.state.TimestampedBytesStore;
|
||||
import org.apache.kafka.streams.state.WindowStore;
|
||||
|
|
@ -34,6 +38,8 @@ public class RocksDBTimeOrderedWindowStore
|
|||
|
||||
private final boolean retainDuplicates;
|
||||
private final long windowSize;
|
||||
|
||||
private StateStoreContext stateStoreContext;
|
||||
private int seqnum = 0;
|
||||
|
||||
RocksDBTimeOrderedWindowStore(
|
||||
|
|
@ -49,6 +55,7 @@ public class RocksDBTimeOrderedWindowStore
|
|||
|
||||
@Override
|
||||
public void init(final StateStoreContext context, final StateStore root) {
|
||||
stateStoreContext = context;
|
||||
wrapped().init(context, root);
|
||||
}
|
||||
|
||||
|
|
@ -168,6 +175,21 @@ public class RocksDBTimeOrderedWindowStore
|
|||
return wrapped().hasIndex();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R> QueryResult<R> query(final Query<R> query,
|
||||
final PositionBound positionBound,
|
||||
final QueryConfig config) {
|
||||
|
||||
return StoreQueryUtils.handleBasicQueries(
|
||||
query,
|
||||
positionBound,
|
||||
config,
|
||||
this,
|
||||
getPosition(),
|
||||
stateStoreContext
|
||||
);
|
||||
}
|
||||
|
||||
private void maybeUpdateSeqnumForDups() {
|
||||
if (retainDuplicates) {
|
||||
seqnum = (seqnum + 1) & 0x7FFFFFFF;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,509 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.integration;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.Serde;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.serialization.Serdes.StringSerde;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.KeyValueTimestamp;
|
||||
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
|
||||
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
|
||||
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
|
||||
import org.apache.kafka.streams.kstream.Consumed;
|
||||
import org.apache.kafka.streams.kstream.EmitStrategy;
|
||||
import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
|
||||
import org.apache.kafka.streams.kstream.JoinWindows;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Produced;
|
||||
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
|
||||
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
|
||||
import org.apache.kafka.streams.kstream.TimeWindows;
|
||||
import org.apache.kafka.streams.kstream.UnlimitedWindows;
|
||||
import org.apache.kafka.streams.kstream.Windowed;
|
||||
import org.apache.kafka.streams.kstream.WindowedSerdes;
|
||||
import org.apache.kafka.streams.kstream.internals.TimeWindow;
|
||||
import org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl;
|
||||
import org.apache.kafka.test.IntegrationTest;
|
||||
import org.apache.kafka.test.MockAggregator;
|
||||
import org.apache.kafka.test.MockInitializer;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameter;
|
||||
|
||||
import static java.time.Duration.ofMillis;
|
||||
import static java.time.Instant.ofEpochMilli;
|
||||
import static java.util.Arrays.asList;
|
||||
import static org.apache.kafka.common.utils.Utils.mkEntry;
|
||||
import static org.apache.kafka.common.utils.Utils.mkMap;
|
||||
import static org.apache.kafka.common.utils.Utils.mkProperties;
|
||||
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Category({IntegrationTest.class})
|
||||
@RunWith(Parameterized.class)
|
||||
public class TimeWindowedKStreamIntegrationTest {
|
||||
private static final int NUM_BROKERS = 1;
|
||||
|
||||
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS,
|
||||
mkProperties(
|
||||
mkMap(mkEntry("log.retention.hours", "-1"), mkEntry("log.retention.bytes", "-1")) // Don't expire records since we manipulate timestamp
|
||||
)
|
||||
);
|
||||
|
||||
@BeforeClass
|
||||
public static void startCluster() throws IOException {
|
||||
CLUSTER.start();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void closeCluster() {
|
||||
CLUSTER.stop();
|
||||
}
|
||||
|
||||
|
||||
private StreamsBuilder builder;
|
||||
private Properties streamsConfiguration;
|
||||
private KafkaStreams kafkaStreams;
|
||||
private String streamOneInput;
|
||||
private String streamTwoInput;
|
||||
private String outputTopic;
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
|
||||
@Parameter
|
||||
public StrategyType type;
|
||||
|
||||
@Parameter(1)
|
||||
public boolean withCache;
|
||||
|
||||
@Parameter(2)
|
||||
public EmitStrategy emitStrategy;
|
||||
|
||||
private boolean emitFinal;
|
||||
|
||||
@Parameterized.Parameters(name = "{0}_{1}")
|
||||
public static Collection<Object[]> getEmitStrategy() {
|
||||
return asList(new Object[][] {
|
||||
{StrategyType.ON_WINDOW_UPDATE, true, EmitStrategy.onWindowUpdate()},
|
||||
{StrategyType.ON_WINDOW_UPDATE, false, EmitStrategy.onWindowUpdate()},
|
||||
{StrategyType.ON_WINDOW_CLOSE, true, EmitStrategy.onWindowClose()},
|
||||
{StrategyType.ON_WINDOW_CLOSE, false, EmitStrategy.onWindowClose()}
|
||||
});
|
||||
}
|
||||
|
||||
@Before
|
||||
public void before() throws InterruptedException {
|
||||
builder = new StreamsBuilder();
|
||||
createTopics();
|
||||
streamsConfiguration = new Properties();
|
||||
final String safeTestName = safeUniqueTestName(getClass(), testName);
|
||||
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
|
||||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
streamsConfiguration.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, 0); // Always process
|
||||
streamsConfiguration.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, Long.MAX_VALUE); // Don't expire changelog
|
||||
|
||||
emitFinal = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE;
|
||||
}
|
||||
|
||||
@After
|
||||
public void whenShuttingDown() throws IOException {
|
||||
if (kafkaStreams != null) {
|
||||
kafkaStreams.close();
|
||||
kafkaStreams.cleanUp();
|
||||
}
|
||||
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldAggregateWindowedWithNoGrace() throws Exception {
|
||||
produceMessages(
|
||||
streamOneInput,
|
||||
new KeyValueTimestamp<>("A", "1", 0),
|
||||
new KeyValueTimestamp<>("A", "2", 5),
|
||||
new KeyValueTimestamp<>("A", "3", 10), // close [0, 10)
|
||||
new KeyValueTimestamp<>("B", "4", 6), // late and skip for [0, 10)
|
||||
new KeyValueTimestamp<>("B", "5", 11),
|
||||
new KeyValueTimestamp<>("B", "6", 15), // close [5, 15)
|
||||
new KeyValueTimestamp<>("C", "7", 25) // close [10, 20), [15, 25)
|
||||
);
|
||||
|
||||
final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, 10L);
|
||||
// TODO: remove this cast https://issues.apache.org/jira/browse/KAFKA-13800
|
||||
final TimeWindowedKStreamImpl<String, String, TimeWindow> windowedStream = (TimeWindowedKStreamImpl<String, String, TimeWindow>) builder
|
||||
.stream(streamOneInput, Consumed.with(Serdes.String(), Serdes.String()))
|
||||
.groupByKey()
|
||||
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(10L)).advanceBy(ofMillis(5L)));
|
||||
windowedStream.emitStrategy(emitStrategy)
|
||||
.aggregate(
|
||||
MockInitializer.STRING_INIT,
|
||||
MockAggregator.TOSTRING_ADDER,
|
||||
getMaterialized()
|
||||
)
|
||||
.toStream()
|
||||
.to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
|
||||
|
||||
startStreams();
|
||||
|
||||
final List<KeyValueTimestamp<Windowed<String>, String>> windowedMessages = receiveMessagesWithTimestamp(
|
||||
new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
|
||||
new StringDeserializer(),
|
||||
10L,
|
||||
String.class,
|
||||
emitFinal ? 6 : 12);
|
||||
|
||||
final List<KeyValueTimestamp<Windowed<String>, String>> expectResult;
|
||||
if (emitFinal) {
|
||||
expectResult = asList(
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1+2", 5),
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+2+3", 10),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+4+5", 11),
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10L, 20L)), "0+3", 10),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), "0+5+6", 15),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)), "0+6", 15)
|
||||
);
|
||||
} else {
|
||||
expectResult = asList(
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1", 0),
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1+2", 5),
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+2", 5),
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+2+3", 10),
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10L, 20L)), "0+3", 10),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+4", 6),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+4+5", 11),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), "0+5", 11),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), "0+5+6", 15),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)), "0+6", 15),
|
||||
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(20L, 30L)), "0+7", 25),
|
||||
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(25L, 35L)), "0+7", 25)
|
||||
);
|
||||
}
|
||||
|
||||
assertThat(windowedMessages, is(expectResult));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldAggregateWindowedWithGrace() throws Exception {
|
||||
produceMessages(
|
||||
streamOneInput,
|
||||
new KeyValueTimestamp<>("A", "1", 0),
|
||||
new KeyValueTimestamp<>("A", "2", 5),
|
||||
new KeyValueTimestamp<>("A", "3", 10),
|
||||
new KeyValueTimestamp<>("B", "4", 6),
|
||||
new KeyValueTimestamp<>("B", "5", 11),
|
||||
new KeyValueTimestamp<>("B", "6", 15), // close [0, 10), output A, B [0, 10)
|
||||
new KeyValueTimestamp<>("C", "7", 25) // close [5, 15), [10, 20)
|
||||
);
|
||||
|
||||
final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, 10L);
|
||||
final TimeWindowedKStreamImpl<String, String, TimeWindow> windowedStream = (TimeWindowedKStreamImpl<String, String, TimeWindow>) builder
|
||||
.stream(streamOneInput, Consumed.with(Serdes.String(), Serdes.String()))
|
||||
.groupByKey()
|
||||
.windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(5)).advanceBy(ofMillis(5L)));
|
||||
windowedStream.emitStrategy(emitStrategy)
|
||||
.aggregate(
|
||||
MockInitializer.STRING_INIT,
|
||||
MockAggregator.TOSTRING_ADDER,
|
||||
getMaterialized()
|
||||
)
|
||||
.toStream()
|
||||
.to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
|
||||
|
||||
startStreams();
|
||||
|
||||
final List<KeyValueTimestamp<Windowed<String>, String>> windowedMessages = receiveMessagesWithTimestamp(
|
||||
new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
|
||||
new StringDeserializer(),
|
||||
10L,
|
||||
String.class,
|
||||
emitFinal ? 6 : 13);
|
||||
|
||||
final List<KeyValueTimestamp<Windowed<String>, String>> expectResult;
|
||||
if (emitFinal) {
|
||||
expectResult = asList(
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1+2", 5),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0L, 10L)), "0+4", 6),
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+2+3", 10),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+4+5", 11),
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10L, 20L)), "0+3", 10),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), "0+5+6", 15)
|
||||
);
|
||||
} else {
|
||||
expectResult = asList(
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1", 0),
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1+2", 5),
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+2", 5),
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+2+3", 10),
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10L, 20L)), "0+3", 10),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0L, 10L)), "0+4", 6),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+4", 6),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+4+5", 11),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), "0+5", 11),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), "0+5+6", 15),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)), "0+6", 15),
|
||||
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(20L, 30L)), "0+7", 25),
|
||||
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(25L, 35L)), "0+7", 25)
|
||||
);
|
||||
}
|
||||
|
||||
assertThat(windowedMessages, is(expectResult));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldRestoreAfterJoinRestart() throws Exception {
|
||||
produceMessages(
|
||||
streamOneInput,
|
||||
new KeyValueTimestamp<>("A", "L1", 0),
|
||||
new KeyValueTimestamp<>("A", "L1", 5),
|
||||
new KeyValueTimestamp<>("B", "L2", 11), // close [0, 10)
|
||||
new KeyValueTimestamp<>("B", "L2", 15), // close [5, 15)
|
||||
new KeyValueTimestamp<>("C", "L3", 25) // close [15, 25), [10, 20)
|
||||
);
|
||||
|
||||
produceMessages(
|
||||
streamTwoInput,
|
||||
new KeyValueTimestamp<>("A", "R1", 0),
|
||||
new KeyValueTimestamp<>("A", "R1", 5),
|
||||
new KeyValueTimestamp<>("B", "R2", 11), // close [0, 10)
|
||||
new KeyValueTimestamp<>("B", "R2", 15), // close [5, 15)
|
||||
new KeyValueTimestamp<>("C", "R3", 25) // close [15, 25), [10, 20)
|
||||
);
|
||||
|
||||
final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(
|
||||
String.class, 10L);
|
||||
final KStream<String, String> streamOne = builder.stream(streamOneInput,
|
||||
Consumed.with(Serdes.String(), Serdes.String()));
|
||||
final KStream<String, String> streamTwo = builder.stream(streamTwoInput,
|
||||
Consumed.with(Serdes.String(), Serdes.String()));
|
||||
|
||||
final KStream<String, String> joinedStream = streamOne
|
||||
.join(streamTwo, (v1, v2) -> v1 + "," + v2,
|
||||
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(2)));
|
||||
|
||||
final TimeWindowedKStreamImpl<String, String, TimeWindow> windowedStream = (TimeWindowedKStreamImpl<String, String, TimeWindow>) joinedStream
|
||||
.groupByKey()
|
||||
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(10L)).advanceBy(ofMillis(5L)));
|
||||
|
||||
windowedStream.emitStrategy(emitStrategy)
|
||||
.aggregate(
|
||||
MockInitializer.STRING_INIT,
|
||||
MockAggregator.TOSTRING_ADDER,
|
||||
getMaterialized()
|
||||
)
|
||||
.toStream()
|
||||
.to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
|
||||
|
||||
startStreams();
|
||||
|
||||
List<KeyValueTimestamp<Windowed<String>, String>> windowedMessages = receiveMessagesWithTimestamp(
|
||||
new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
|
||||
new StringDeserializer(),
|
||||
10L,
|
||||
String.class,
|
||||
emitFinal ? 5 : 9);
|
||||
|
||||
List<KeyValueTimestamp<Windowed<String>, String>> expectResult;
|
||||
if (emitFinal) {
|
||||
expectResult = asList(
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)),
|
||||
"0+L1,R1+L1,R1", 5),
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+L1,R1",
|
||||
5),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+L2,R2",
|
||||
11),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)),
|
||||
"0+L2,R2+L2,R2", 15),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)),
|
||||
"0+L2,R2", 15)
|
||||
);
|
||||
} else {
|
||||
expectResult = asList(
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+L1,R1",
|
||||
0),
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)),
|
||||
"0+L1,R1+L1,R1", 5),
|
||||
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+L1,R1",
|
||||
5),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+L2,R2",
|
||||
11),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)),
|
||||
"0+L2,R2", 11),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)),
|
||||
"0+L2,R2+L2,R2", 15),
|
||||
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)),
|
||||
"0+L2,R2", 15),
|
||||
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(20L, 30L)),
|
||||
"0+L3,R3", 25),
|
||||
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(25L, 35L)),
|
||||
"0+L3,R3", 25)
|
||||
);
|
||||
}
|
||||
|
||||
assertThat(windowedMessages, is(expectResult));
|
||||
|
||||
kafkaStreams.close();
|
||||
kafkaStreams.cleanUp(); // Purge store to force restoration
|
||||
|
||||
produceMessages(
|
||||
streamOneInput,
|
||||
new KeyValueTimestamp<>("C", "L3", 35) // close [20, 30), [25, 35)
|
||||
);
|
||||
produceMessages(
|
||||
streamTwoInput,
|
||||
new KeyValueTimestamp<>("C", "R3", 35) // close [20, 30), [25, 35)
|
||||
);
|
||||
|
||||
// Restart
|
||||
startStreams();
|
||||
|
||||
windowedMessages = receiveMessagesWithTimestamp(
|
||||
new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
|
||||
new StringDeserializer(),
|
||||
10L,
|
||||
String.class,
|
||||
2);
|
||||
|
||||
if (emitFinal) {
|
||||
// Output just new closed window for C
|
||||
expectResult = asList(
|
||||
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(20L, 30L)),
|
||||
"0+L3,R3", 25),
|
||||
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(25L, 35L)),
|
||||
"0+L3,R3", 25)
|
||||
);
|
||||
} else {
|
||||
expectResult = asList(
|
||||
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(30L, 40L)),
|
||||
"0+L3,R3", 35),
|
||||
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(35L, 45L)),
|
||||
"0+L3,R3", 35)
|
||||
);
|
||||
}
|
||||
|
||||
assertThat(windowedMessages, is(expectResult));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowUnlimitedWindows() {
|
||||
final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, 10L);
|
||||
final TimeWindowedKStreamImpl<String, String, TimeWindow> windowedStream = (TimeWindowedKStreamImpl<String, String, TimeWindow>) builder
|
||||
.stream(streamOneInput, Consumed.with(Serdes.String(), Serdes.String()))
|
||||
.groupByKey()
|
||||
.windowedBy(
|
||||
UnlimitedWindows.of().startOn(ofEpochMilli(0))
|
||||
);
|
||||
|
||||
if (emitFinal) {
|
||||
assertThrows(IllegalArgumentException.class, () -> windowedStream.emitStrategy(emitStrategy));
|
||||
} else {
|
||||
windowedStream.emitStrategy(emitStrategy);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void produceMessages(final String topic, final KeyValueTimestamp<String, String>... records) {
|
||||
IntegrationTestUtils.produceSynchronously(
|
||||
TestUtils.producerConfig(
|
||||
CLUSTER.bootstrapServers(),
|
||||
StringSerializer.class,
|
||||
StringSerializer.class),
|
||||
false,
|
||||
topic,
|
||||
Optional.empty(),
|
||||
Arrays.asList(records)
|
||||
);
|
||||
}
|
||||
|
||||
private Materialized getMaterialized() {
|
||||
if (withCache) {
|
||||
return Materialized.with(null, new StringSerde()).withCachingEnabled();
|
||||
}
|
||||
return Materialized.with(null, new StringSerde()).withCachingDisabled();
|
||||
}
|
||||
|
||||
private void createTopics() throws InterruptedException {
|
||||
final String safeTestName = safeUniqueTestName(getClass(), testName);
|
||||
streamOneInput = "stream-one-" + safeTestName;
|
||||
streamTwoInput = "stream-two-" + safeTestName;
|
||||
outputTopic = "output-" + safeTestName;
|
||||
CLUSTER.createTopic(streamOneInput, 1, 1);
|
||||
CLUSTER.createTopic(streamTwoInput, 1, 1);
|
||||
CLUSTER.createTopic(outputTopic);
|
||||
}
|
||||
|
||||
private void startStreams() {
|
||||
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
|
||||
kafkaStreams.start();
|
||||
}
|
||||
|
||||
private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(final Deserializer<K> keyDeserializer,
|
||||
final Deserializer<V> valueDeserializer,
|
||||
final long windowSize,
|
||||
final Class innerClass,
|
||||
final int numMessages) throws Exception {
|
||||
final String safeTestName = safeUniqueTestName(getClass(), testName);
|
||||
final Properties consumerProperties = new Properties();
|
||||
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
|
||||
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
|
||||
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
|
||||
consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, windowSize);
|
||||
if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
|
||||
consumerProperties.setProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE,
|
||||
Serdes.serdeFrom(innerClass).getClass().getName());
|
||||
}
|
||||
return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
|
||||
consumerProperties,
|
||||
outputTopic,
|
||||
numMessages,
|
||||
60 * 1000);
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
|
|
@ -17,20 +17,26 @@
|
|||
|
||||
package org.apache.kafka.streams.kstream.internals;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.KeyValueTimestamp;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
|
||||
import org.apache.kafka.streams.TopologyTestDriver;
|
||||
import org.apache.kafka.streams.kstream.Consumed;
|
||||
import org.apache.kafka.streams.kstream.EmitStrategy;
|
||||
import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
|
||||
import org.apache.kafka.streams.kstream.Grouped;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Named;
|
||||
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
|
||||
import org.apache.kafka.streams.kstream.TimeWindows;
|
||||
import org.apache.kafka.streams.kstream.Windowed;
|
||||
import org.apache.kafka.streams.processor.StateStore;
|
||||
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
||||
import org.apache.kafka.streams.state.WindowStore;
|
||||
import org.apache.kafka.streams.TestInputTopic;
|
||||
|
|
@ -42,34 +48,71 @@ import org.apache.kafka.test.StreamsTestUtils;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameter;
|
||||
|
||||
import static java.time.Duration.ofMillis;
|
||||
import static java.time.Instant.ofEpochMilli;
|
||||
import static java.util.Arrays.asList;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TimeWindowedKStreamImplTest {
|
||||
private static final String TOPIC = "input";
|
||||
private static final Windowed<String> KEY_1_WINDOW_0 = new Windowed<>("1", new TimeWindow(0L, 500L));
|
||||
private static final Windowed<String> KEY_1_WINDOW_1 = new Windowed<>("1", new TimeWindow(500L, 1000L));
|
||||
private static final Windowed<String> KEY_2_WINDOW_1 = new Windowed<>("2", new TimeWindow(500L, 1000L));
|
||||
private static final Windowed<String> KEY_2_WINDOW_2 = new Windowed<>("2", new TimeWindow(1000L, 1500L));
|
||||
|
||||
private final StreamsBuilder builder = new StreamsBuilder();
|
||||
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
|
||||
private TimeWindowedKStream<String, String> windowedStream;
|
||||
private TimeWindowedKStreamImpl<String, String, TimeWindow> windowedStream;
|
||||
|
||||
@Parameter
|
||||
public StrategyType type;
|
||||
|
||||
@Parameter(1)
|
||||
public boolean withCache;
|
||||
|
||||
@Parameter(2)
|
||||
public EmitStrategy emitStrategy;
|
||||
|
||||
private boolean emitFinal;
|
||||
|
||||
@Parameterized.Parameters(name = "{0}_{1}")
|
||||
public static Collection<Object[]> getKeySchema() {
|
||||
return asList(new Object[][] {
|
||||
{StrategyType.ON_WINDOW_UPDATE, true, EmitStrategy.onWindowUpdate()},
|
||||
{StrategyType.ON_WINDOW_UPDATE, false, EmitStrategy.onWindowUpdate()},
|
||||
{StrategyType.ON_WINDOW_CLOSE, true, EmitStrategy.onWindowClose()},
|
||||
{StrategyType.ON_WINDOW_CLOSE, false, EmitStrategy.onWindowClose()}
|
||||
});
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Before
|
||||
public void before() {
|
||||
emitFinal = type.equals(StrategyType.ON_WINDOW_CLOSE);
|
||||
// Set interval to 0 so that it always tries to emit
|
||||
props.setProperty(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, "0");
|
||||
final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
|
||||
windowedStream = stream.
|
||||
// TODO: remove this cast https://issues.apache.org/jira/browse/KAFKA-13800
|
||||
windowedStream = (TimeWindowedKStreamImpl<String, String, TimeWindow>) (stream.
|
||||
groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
|
||||
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L)));
|
||||
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCountWindowed() {
|
||||
final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>();
|
||||
windowedStream
|
||||
.emitStrategy(emitStrategy)
|
||||
.count()
|
||||
.toStream()
|
||||
.process(supplier);
|
||||
|
|
@ -77,24 +120,37 @@ public class TimeWindowedKStreamImplTest {
|
|||
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
|
||||
processData(driver);
|
||||
}
|
||||
assertThat(
|
||||
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
|
||||
.get(new Windowed<>("1", new TimeWindow(0L, 500L))),
|
||||
equalTo(ValueAndTimestamp.make(2L, 15L)));
|
||||
assertThat(
|
||||
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
|
||||
.get(new Windowed<>("2", new TimeWindow(500L, 1000L))),
|
||||
equalTo(ValueAndTimestamp.make(2L, 550L)));
|
||||
assertThat(
|
||||
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
|
||||
.get(new Windowed<>("1", new TimeWindow(500L, 1000L))),
|
||||
equalTo(ValueAndTimestamp.make(1L, 500L)));
|
||||
final ArrayList<KeyValueTimestamp<Windowed<String>, Long>> processed = supplier.theCapturedProcessor().processed();
|
||||
|
||||
if (emitFinal) {
|
||||
assertEquals(
|
||||
asList(
|
||||
new KeyValueTimestamp<>(KEY_1_WINDOW_0, 2L, 15L),
|
||||
new KeyValueTimestamp<>(KEY_1_WINDOW_1, 1L, 500L),
|
||||
new KeyValueTimestamp<>(KEY_2_WINDOW_1, 2L, 550L)
|
||||
),
|
||||
processed
|
||||
);
|
||||
} else {
|
||||
assertEquals(
|
||||
asList(
|
||||
new KeyValueTimestamp<>(KEY_1_WINDOW_0, 1L, 10L),
|
||||
new KeyValueTimestamp<>(KEY_1_WINDOW_0, 2L, 15L),
|
||||
new KeyValueTimestamp<>(KEY_1_WINDOW_1, 1L, 500L),
|
||||
new KeyValueTimestamp<>(KEY_2_WINDOW_1, 1L, 550L),
|
||||
new KeyValueTimestamp<>(KEY_2_WINDOW_1, 2L, 550L),
|
||||
new KeyValueTimestamp<>(KEY_2_WINDOW_2, 1L, 1000L)
|
||||
),
|
||||
processed
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReduceWindowed() {
|
||||
final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
|
||||
windowedStream
|
||||
.emitStrategy(emitStrategy)
|
||||
.reduce(MockReducer.STRING_ADDER)
|
||||
.toStream()
|
||||
.process(supplier);
|
||||
|
|
@ -102,54 +158,81 @@ public class TimeWindowedKStreamImplTest {
|
|||
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
|
||||
processData(driver);
|
||||
}
|
||||
assertThat(
|
||||
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
|
||||
.get(new Windowed<>("1", new TimeWindow(0L, 500L))),
|
||||
equalTo(ValueAndTimestamp.make("1+2", 15L)));
|
||||
assertThat(
|
||||
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
|
||||
.get(new Windowed<>("2", new TimeWindow(500L, 1000L))),
|
||||
equalTo(ValueAndTimestamp.make("10+20", 550L)));
|
||||
assertThat(
|
||||
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
|
||||
.get(new Windowed<>("1", new TimeWindow(500L, 1000L))),
|
||||
equalTo(ValueAndTimestamp.make("3", 500L)));
|
||||
|
||||
final ArrayList<KeyValueTimestamp<Windowed<String>, String>> processed = supplier.theCapturedProcessor().processed();
|
||||
if (emitFinal) {
|
||||
assertEquals(
|
||||
asList(
|
||||
new KeyValueTimestamp<>(KEY_1_WINDOW_0, "1+2", 15L),
|
||||
new KeyValueTimestamp<>(KEY_1_WINDOW_1, "3", 500L),
|
||||
new KeyValueTimestamp<>(KEY_2_WINDOW_1, "10+20", 550L)
|
||||
),
|
||||
processed
|
||||
);
|
||||
} else {
|
||||
assertEquals(
|
||||
asList(
|
||||
new KeyValueTimestamp<>(KEY_1_WINDOW_0, "1", 10L),
|
||||
new KeyValueTimestamp<>(KEY_1_WINDOW_0, "1+2", 15L),
|
||||
new KeyValueTimestamp<>(KEY_1_WINDOW_1, "3", 500L),
|
||||
new KeyValueTimestamp<>(KEY_2_WINDOW_1, "10", 550L),
|
||||
new KeyValueTimestamp<>(KEY_2_WINDOW_1, "10+20", 550L),
|
||||
new KeyValueTimestamp<>(KEY_2_WINDOW_2, "30", 1000L)
|
||||
),
|
||||
processed
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldAggregateWindowed() {
|
||||
final MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
|
||||
windowedStream
|
||||
.emitStrategy(emitStrategy)
|
||||
.aggregate(
|
||||
MockInitializer.STRING_INIT,
|
||||
MockAggregator.TOSTRING_ADDER,
|
||||
Materialized.with(Serdes.String(), Serdes.String()))
|
||||
setMaterializedCache(Materialized.with(Serdes.String(), Serdes.String())))
|
||||
.toStream()
|
||||
.process(supplier);
|
||||
|
||||
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
|
||||
processData(driver);
|
||||
}
|
||||
assertThat(
|
||||
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
|
||||
.get(new Windowed<>("1", new TimeWindow(0L, 500L))),
|
||||
equalTo(ValueAndTimestamp.make("0+1+2", 15L)));
|
||||
assertThat(
|
||||
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
|
||||
.get(new Windowed<>("2", new TimeWindow(500L, 1000L))),
|
||||
equalTo(ValueAndTimestamp.make("0+10+20", 550L)));
|
||||
assertThat(
|
||||
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
|
||||
.get(new Windowed<>("1", new TimeWindow(500L, 1000L))),
|
||||
equalTo(ValueAndTimestamp.make("0+3", 500L)));
|
||||
|
||||
final ArrayList<KeyValueTimestamp<Windowed<String>, String>> processed = supplier.theCapturedProcessor().processed();
|
||||
if (emitFinal) {
|
||||
assertEquals(
|
||||
asList(
|
||||
new KeyValueTimestamp<>(KEY_1_WINDOW_0, "0+1+2", 15L),
|
||||
new KeyValueTimestamp<>(KEY_1_WINDOW_1, "0+3", 500L),
|
||||
new KeyValueTimestamp<>(KEY_2_WINDOW_1, "0+10+20", 550L)
|
||||
),
|
||||
processed
|
||||
);
|
||||
} else {
|
||||
assertEquals(
|
||||
asList(
|
||||
new KeyValueTimestamp<>(KEY_1_WINDOW_0, "0+1", 10L),
|
||||
new KeyValueTimestamp<>(KEY_1_WINDOW_0, "0+1+2", 15L),
|
||||
new KeyValueTimestamp<>(KEY_1_WINDOW_1, "0+3", 500L),
|
||||
new KeyValueTimestamp<>(KEY_2_WINDOW_1, "0+10", 550L),
|
||||
new KeyValueTimestamp<>(KEY_2_WINDOW_1, "0+10+20", 550L),
|
||||
new KeyValueTimestamp<>(KEY_2_WINDOW_2, "0+30", 1000L)
|
||||
),
|
||||
processed
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldMaterializeCount() {
|
||||
windowedStream.count(
|
||||
Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store")
|
||||
.withKeySerde(Serdes.String())
|
||||
.withValueSerde(Serdes.Long()));
|
||||
windowedStream
|
||||
.emitStrategy(emitStrategy)
|
||||
.count(
|
||||
setMaterializedCache(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store")
|
||||
.withKeySerde(Serdes.String())
|
||||
.withValueSerde(Serdes.Long())));
|
||||
|
||||
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
|
||||
processData(driver);
|
||||
|
|
@ -158,10 +241,11 @@ public class TimeWindowedKStreamImplTest {
|
|||
final List<KeyValue<Windowed<String>, Long>> data =
|
||||
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
|
||||
|
||||
assertThat(data, equalTo(Arrays.asList(
|
||||
assertThat(data, equalTo(asList(
|
||||
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
|
||||
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L))));
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L),
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), 1L))));
|
||||
}
|
||||
{
|
||||
final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
|
||||
|
|
@ -169,10 +253,11 @@ public class TimeWindowedKStreamImplTest {
|
|||
final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
|
||||
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
|
||||
|
||||
assertThat(data, equalTo(Arrays.asList(
|
||||
assertThat(data, equalTo(asList(
|
||||
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make(2L, 15L)),
|
||||
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make(1L, 500L)),
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make(2L, 550L)))));
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make(2L, 550L)),
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L)))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -181,9 +266,9 @@ public class TimeWindowedKStreamImplTest {
|
|||
public void shouldMaterializeReduced() {
|
||||
windowedStream.reduce(
|
||||
MockReducer.STRING_ADDER,
|
||||
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("reduced")
|
||||
setMaterializedCache(Materialized.<String, String, WindowStore<Bytes, byte[]>>as("reduced")
|
||||
.withKeySerde(Serdes.String())
|
||||
.withValueSerde(Serdes.String()));
|
||||
.withValueSerde(Serdes.String())));
|
||||
|
||||
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
|
||||
processData(driver);
|
||||
|
|
@ -192,20 +277,22 @@ public class TimeWindowedKStreamImplTest {
|
|||
final List<KeyValue<Windowed<String>, String>> data =
|
||||
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
|
||||
|
||||
assertThat(data, equalTo(Arrays.asList(
|
||||
assertThat(data, equalTo(asList(
|
||||
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
|
||||
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "10+20"))));
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "10+20"),
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "30"))));
|
||||
}
|
||||
{
|
||||
final WindowStore<String, ValueAndTimestamp<String>> windowStore = driver.getTimestampedWindowStore("reduced");
|
||||
final List<KeyValue<Windowed<String>, ValueAndTimestamp<String>>> data =
|
||||
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
|
||||
|
||||
assertThat(data, equalTo(Arrays.asList(
|
||||
assertThat(data, equalTo(asList(
|
||||
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("1+2", 15L)),
|
||||
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("3", 500L)),
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("10+20", 550L)))));
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("10+20", 550L)),
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("30", 1000L)))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -215,9 +302,9 @@ public class TimeWindowedKStreamImplTest {
|
|||
windowedStream.aggregate(
|
||||
MockInitializer.STRING_INIT,
|
||||
MockAggregator.TOSTRING_ADDER,
|
||||
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated")
|
||||
setMaterializedCache(Materialized.<String, String, WindowStore<Bytes, byte[]>>as("aggregated")
|
||||
.withKeySerde(Serdes.String())
|
||||
.withValueSerde(Serdes.String()));
|
||||
.withValueSerde(Serdes.String())));
|
||||
|
||||
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
|
||||
processData(driver);
|
||||
|
|
@ -226,20 +313,22 @@ public class TimeWindowedKStreamImplTest {
|
|||
final List<KeyValue<Windowed<String>, String>> data =
|
||||
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
|
||||
|
||||
assertThat(data, equalTo(Arrays.asList(
|
||||
assertThat(data, equalTo(asList(
|
||||
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
|
||||
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+10+20"))));
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+10+20"),
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "0+30"))));
|
||||
}
|
||||
{
|
||||
final WindowStore<String, ValueAndTimestamp<String>> windowStore = driver.getTimestampedWindowStore("aggregated");
|
||||
final List<KeyValue<Windowed<String>, ValueAndTimestamp<String>>> data =
|
||||
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
|
||||
|
||||
assertThat(data, equalTo(Arrays.asList(
|
||||
assertThat(data, equalTo(asList(
|
||||
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("0+1+2", 15L)),
|
||||
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+3", 500L)),
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+10+20", 550L)))));
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+10+20", 550L)),
|
||||
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("0+30", 1000L)))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -264,7 +353,7 @@ public class TimeWindowedKStreamImplTest {
|
|||
assertThrows(NullPointerException.class, () -> windowedStream.aggregate(
|
||||
null,
|
||||
MockAggregator.TOSTRING_ADDER,
|
||||
Materialized.as("store")));
|
||||
setMaterializedCache(Materialized.as("store"))));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -272,7 +361,7 @@ public class TimeWindowedKStreamImplTest {
|
|||
assertThrows(NullPointerException.class, () -> windowedStream.aggregate(
|
||||
MockInitializer.STRING_INIT,
|
||||
null,
|
||||
Materialized.as("store")));
|
||||
setMaterializedCache(Materialized.as("store"))));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
@ -288,7 +377,7 @@ public class TimeWindowedKStreamImplTest {
|
|||
public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
|
||||
assertThrows(NullPointerException.class, () -> windowedStream.reduce(
|
||||
null,
|
||||
Materialized.as("store")));
|
||||
setMaterializedCache(Materialized.as("store"))));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -319,6 +408,13 @@ public class TimeWindowedKStreamImplTest {
|
|||
inputTopic.pipeInput("1", "3", 500L);
|
||||
inputTopic.pipeInput("2", "10", 550L);
|
||||
inputTopic.pipeInput("2", "20", 500L);
|
||||
inputTopic.pipeInput("2", "30", 1000L);
|
||||
}
|
||||
|
||||
private <K, V, S extends StateStore> Materialized<K, V, S> setMaterializedCache(final Materialized<K, V, S> materialized) {
|
||||
if (withCache) {
|
||||
return materialized.withCachingEnabled();
|
||||
}
|
||||
return materialized.withCachingDisabled();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ import static org.junit.Assert.assertThrows;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TimeOrderedCachingPersistentWindowStoreTest {
|
||||
public class TimeOrderedWindowStoreTest {
|
||||
|
||||
private static final int MAX_CACHE_SIZE_BYTES = 300;
|
||||
private static final long DEFAULT_TIMESTAMP = 10L;
|
||||
|
|
@ -246,7 +246,6 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
|
|||
context.forward(record);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
|
@ -890,7 +889,6 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
|
|||
public void shouldSkipNonExistBaseKeyInCache() {
|
||||
cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0);
|
||||
|
||||
final SegmentedCacheFunction baseCacheFunction = new SegmentedCacheFunction(new TimeFirstWindowKeySchema(), SEGMENT_INTERVAL);
|
||||
final SegmentedCacheFunction indexCacheFunction = new SegmentedCacheFunction(new KeyFirstWindowKeySchema(), SEGMENT_INTERVAL);
|
||||
|
||||
final Bytes key = bytesKey("a");
|
||||
Loading…
Reference in New Issue