Kafka 12960: Follow up Commit to filter expired records from Windowed/Session Stores (#12756)

KAFKA-12960: Enforcing strict retention time for WindowStore and SessionStore

Reviewers: Luke Chen <showuon@gmail.com>, Vicky Papavasileiou
This commit is contained in:
vamossagar12 2022-11-07 09:23:34 +05:30 committed by GitHub
parent b8754c074a
commit 7fd6a9b3e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 800 additions and 356 deletions

View File

@ -186,7 +186,8 @@ class KStreamImplJoin {
thisWindowStore.name(),
internalWindows,
joiner,
sharedTimeTracker
sharedTimeTracker,
windows.size() + windows.gracePeriodMs()
);
final PassThrough<K, VOut> joinMerge = new PassThrough<>();

View File

@ -41,6 +41,7 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1
private final long joinThisAfterMs;
private final long joinOtherBeforeMs;
private final long joinOtherAfterMs;
private final long retentionPeriod;
private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;
private final TimeTracker sharedTimeTracker;
@ -49,7 +50,8 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1
final String windowName,
final JoinWindowsInternal windows,
final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis,
final TimeTracker sharedTimeTracker) {
final TimeTracker sharedTimeTracker,
final long retentionPeriod) {
this.windowName = windowName;
this.joinThisBeforeMs = windows.beforeMs;
@ -58,6 +60,7 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1
this.joinOtherAfterMs = windows.beforeMs;
this.joinerThis = joinerThis;
this.sharedTimeTracker = sharedTimeTracker;
this.retentionPeriod = retentionPeriod;
}
@Override
@ -93,6 +96,8 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1
.withValue(joinerThis.apply(record.key(), record.value(), (V2) record.value()))
.withTimestamp(inputRecordTimestamp);
sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
// We emit the self record only if it isn't expired.
final boolean emitSelfRecord = inputRecordTimestamp > sharedTimeTracker.streamTime - retentionPeriod + 1;
// Join current record with other
try (final WindowStoreIterator<V2> iter = windowStore.fetch(record.key(), timeFrom, timeTo)) {
@ -120,7 +125,7 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1
// This is needed so that output records follow timestamp order
// Join this with self
if (inputRecordTimestamp < maxRecordTimestamp && !emittedJoinWithSelf) {
if (inputRecordTimestamp < maxRecordTimestamp && !emittedJoinWithSelf && emitSelfRecord) {
emittedJoinWithSelf = true;
context().forward(selfRecord);
}
@ -134,7 +139,7 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1
}
// Join this with self
if (!emittedJoinWithSelf) {
if (!emittedJoinWithSelf && emitSelfRecord) {
context().forward(selfRecord);
}
}

View File

@ -52,6 +52,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
protected final AbstractSegments<S> segments;
protected final KeySchema baseKeySchema;
protected final Optional<KeySchema> indexKeySchema;
private final long retentionPeriod;
protected ProcessorContext context;
@ -66,22 +67,27 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
AbstractDualSchemaRocksDBSegmentedBytesStore(final String name,
final KeySchema baseKeySchema,
final Optional<KeySchema> indexKeySchema,
final AbstractSegments<S> segments) {
final AbstractSegments<S> segments,
final long retentionPeriod) {
this.name = name;
this.baseKeySchema = baseKeySchema;
this.indexKeySchema = indexKeySchema;
this.segments = segments;
this.retentionPeriod = retentionPeriod;
}
@Override
public KeyValueIterator<Bytes, byte[]> all() {
final long actualFrom = getActualFrom(0, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
final List<S> searchSpace = segments.allSegments(true);
final Bytes from = baseKeySchema.lowerRange(null, 0);
final Bytes from = baseKeySchema.lowerRange(null, actualFrom);
final Bytes to = baseKeySchema.upperRange(null, Long.MAX_VALUE);
return new SegmentIterator<>(
searchSpace.iterator(),
baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, true),
baseKeySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, true),
from,
to,
true);
@ -89,13 +95,16 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
@Override
public KeyValueIterator<Bytes, byte[]> backwardAll() {
final long actualFrom = getActualFrom(0, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
final List<S> searchSpace = segments.allSegments(false);
final Bytes from = baseKeySchema.lowerRange(null, 0);
final Bytes from = baseKeySchema.lowerRange(null, actualFrom);
final Bytes to = baseKeySchema.upperRange(null, Long.MAX_VALUE);
return new SegmentIterator<>(
searchSpace.iterator(),
baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, false),
baseKeySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, false),
from,
to,
false);
@ -119,6 +128,15 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
abstract protected KeyValue<Bytes, byte[]> getIndexKeyValue(final Bytes baseKey, final byte[] baseValue);
// isTimeFirstWindowSchema true implies ON_WINDOW_CLOSE semantics. There's an edge case
// when retentionPeriod = grace Period. If we add 1, then actualFrom > to which would
// lead to no records being returned.
protected long getActualFrom(final long from, final boolean isTimeFirstWindowSchema) {
return isTimeFirstWindowSchema ? Math.max(from, observedStreamTime - retentionPeriod) :
Math.max(from, observedStreamTime - retentionPeriod + 1);
}
// For testing
void putIndex(final Bytes indexKey, final byte[] value) {
if (!hasIndex()) {
@ -191,7 +209,24 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
@Override
public byte[] get(final Bytes rawKey) {
final S segment = segments.getSegmentForTimestamp(baseKeySchema.segmentTimestamp(rawKey));
final long timestampFromRawKey = baseKeySchema.segmentTimestamp(rawKey);
// check if timestamp is expired
if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema) {
if (timestampFromRawKey < observedStreamTime - retentionPeriod) {
LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})",
rawKey.toString(), timestampFromRawKey, observedStreamTime - retentionPeriod);
return null;
}
} else {
if (timestampFromRawKey < observedStreamTime - retentionPeriod + 1) {
LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})",
rawKey.toString(), timestampFromRawKey, observedStreamTime - retentionPeriod + 1);
return null;
}
}
final S segment = segments.getSegmentForTimestamp(timestampFromRawKey);
if (segment == null) {
return null;
}

View File

@ -52,6 +52,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
private final String name;
private final AbstractSegments<S> segments;
private final String metricScope;
private final long retentionPeriod;
private final KeySchema keySchema;
private ProcessorContext context;
@ -65,10 +66,12 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
AbstractRocksDBSegmentedBytesStore(final String name,
final String metricScope,
final long retentionPeriod,
final KeySchema keySchema,
final AbstractSegments<S> segments) {
this.name = name;
this.metricScope = metricScope;
this.retentionPeriod = retentionPeriod;
this.keySchema = keySchema;
this.segments = segments;
}
@ -91,19 +94,30 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
final long from,
final long to,
final boolean forward) {
final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, to, forward);
final long actualFrom = getActualFrom(from);
final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, from);
if (keySchema instanceof WindowKeySchema && to < actualFrom) {
LOG.debug("Returning no records for key {} as to ({}) < actualFrom ({}) ", key.toString(), to, actualFrom);
return KeyValueIterators.emptyIterator();
}
final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, to, forward);
final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, actualFrom);
final Bytes binaryTo = keySchema.upperRangeFixedSize(key, to);
return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(key, key, from, to, forward),
keySchema.hasNextCondition(key, key, actualFrom, to, forward),
binaryFrom,
binaryTo,
forward);
}
private long getActualFrom(final long from) {
return Math.max(from, observedStreamTime - retentionPeriod + 1);
}
@Override
public KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom,
final Bytes keyTo,
@ -133,14 +147,21 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
return KeyValueIterators.emptyIterator();
}
final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, to, forward);
final long actualFrom = getActualFrom(from);
final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, from);
if (keySchema instanceof WindowKeySchema && to < actualFrom) {
LOG.debug("Returning no records for keys {}/{} as to ({}) < actualFrom ({}) ", keyFrom, keyTo, to, actualFrom);
return KeyValueIterators.emptyIterator();
}
final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, to, forward);
final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, actualFrom);
final Bytes binaryTo = keyTo == null ? null : keySchema.upperRange(keyTo, to);
return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(keyFrom, keyTo, from, to, forward),
keySchema.hasNextCondition(keyFrom, keyTo, actualFrom, to, forward),
binaryFrom,
binaryTo,
forward);
@ -148,11 +169,12 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
@Override
public KeyValueIterator<Bytes, byte[]> all() {
final List<S> searchSpace = segments.allSegments(true);
final long actualFrom = getActualFrom(0);
final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, Long.MAX_VALUE, true);
return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, true),
keySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, true),
null,
null,
true);
@ -160,11 +182,13 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
@Override
public KeyValueIterator<Bytes, byte[]> backwardAll() {
final List<S> searchSpace = segments.allSegments(false);
final long actualFrom = getActualFrom(0);
final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, Long.MAX_VALUE, false);
return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, false),
keySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, false),
null,
null,
false);
@ -173,11 +197,18 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
@Override
public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,
final long timeTo) {
final List<S> searchSpace = segments.segments(timeFrom, timeTo, true);
final long actualFrom = getActualFrom(timeFrom);
if (keySchema instanceof WindowKeySchema && timeTo < actualFrom) {
LOG.debug("Returning no records for as timeTo ({}) < actualFrom ({}) ", timeTo, actualFrom);
return KeyValueIterators.emptyIterator();
}
final List<S> searchSpace = segments.segments(actualFrom, timeTo, true);
return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(null, null, timeFrom, timeTo, true),
keySchema.hasNextCondition(null, null, actualFrom, timeTo, true),
null,
null,
true);
@ -186,11 +217,18 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
@Override
public KeyValueIterator<Bytes, byte[]> backwardFetchAll(final long timeFrom,
final long timeTo) {
final List<S> searchSpace = segments.segments(timeFrom, timeTo, false);
final long actualFrom = getActualFrom(timeFrom);
if (keySchema instanceof WindowKeySchema && timeTo < actualFrom) {
LOG.debug("Returning no records for as timeTo ({}) < actualFrom ({}) ", timeTo, actualFrom);
return KeyValueIterators.emptyIterator();
}
final List<S> searchSpace = segments.segments(actualFrom, timeTo, false);
return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(null, null, timeFrom, timeTo, false),
keySchema.hasNextCondition(null, null, actualFrom, timeTo, false),
null,
null,
false);
@ -234,7 +272,14 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
@Override
public byte[] get(final Bytes key) {
final S segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
final long timestampFromKey = keySchema.segmentTimestamp(key);
// check if timestamp is expired
if (timestampFromKey < observedStreamTime - retentionPeriod + 1) {
LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})",
key.toString(), timestampFromKey, observedStreamTime - retentionPeriod + 1);
return null;
}
final S segment = segments.getSegmentForTimestamp(timestampFromKey);
if (segment == null) {
return null;
}

View File

@ -87,7 +87,7 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
cachedValue = get(baseKey);
if (cachedValue == null) {
// Key not in base store, inconsistency happened and remove from index.
// Key not in base store or key is expired, inconsistency happened and remove from index.
indexIterator.next();
AbstractRocksDBTimeOrderedSegmentedBytesStore.this.removeIndex(key);
} else {
@ -118,7 +118,7 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
final KeySchema baseKeySchema,
final Optional<KeySchema> indexKeySchema) {
super(name, baseKeySchema, indexKeySchema,
new KeyValueSegments(name, metricsScope, retention, segmentInterval));
new KeyValueSegments(name, metricsScope, retention, segmentInterval), retention);
}
@Override
@ -141,28 +141,38 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
final long from,
final long to,
final boolean forward) {
if (indexKeySchema.isPresent()) {
final List<KeyValueSegment> searchSpace = indexKeySchema.get().segmentsToSearch(segments, from, to, forward);
final Bytes binaryFrom = indexKeySchema.get().lowerRangeFixedSize(key, from);
final long actualFrom = getActualFrom(from, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && to < actualFrom) {
return KeyValueIterators.emptyIterator();
}
if (indexKeySchema.isPresent()) {
final List<KeyValueSegment> searchSpace = indexKeySchema.get().segmentsToSearch(segments, actualFrom, to,
forward);
final Bytes binaryFrom = indexKeySchema.get().lowerRangeFixedSize(key, actualFrom);
final Bytes binaryTo = indexKeySchema.get().upperRangeFixedSize(key, to);
return getIndexToBaseStoreIterator(new SegmentIterator<>(
searchSpace.iterator(),
indexKeySchema.get().hasNextCondition(key, key, from, to, forward),
indexKeySchema.get().hasNextCondition(key, key, actualFrom, to, forward),
binaryFrom,
binaryTo,
forward));
}
final List<KeyValueSegment> searchSpace = baseKeySchema.segmentsToSearch(segments, from, to, forward);
final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(key, from);
final List<KeyValueSegment> searchSpace = baseKeySchema.segmentsToSearch(segments, actualFrom, to,
forward);
final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(key, actualFrom);
final Bytes binaryTo = baseKeySchema.upperRangeFixedSize(key, to);
return new SegmentIterator<>(
searchSpace.iterator(),
baseKeySchema.hasNextCondition(key, key, from, to, forward),
baseKeySchema.hasNextCondition(key, key, actualFrom, to, forward),
binaryFrom,
binaryTo,
forward);
@ -197,30 +207,36 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
return KeyValueIterators.emptyIterator();
}
final long actualFrom = getActualFrom(from, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && to < actualFrom) {
return KeyValueIterators.emptyIterator();
}
if (indexKeySchema.isPresent()) {
final List<KeyValueSegment> searchSpace = indexKeySchema.get().segmentsToSearch(segments, from, to,
final List<KeyValueSegment> searchSpace = indexKeySchema.get().segmentsToSearch(segments, actualFrom, to,
forward);
final Bytes binaryFrom = indexKeySchema.get().lowerRange(keyFrom, from);
final Bytes binaryFrom = indexKeySchema.get().lowerRange(keyFrom, actualFrom);
final Bytes binaryTo = indexKeySchema.get().upperRange(keyTo, to);
return getIndexToBaseStoreIterator(new SegmentIterator<>(
searchSpace.iterator(),
indexKeySchema.get().hasNextCondition(keyFrom, keyTo, from, to, forward),
indexKeySchema.get().hasNextCondition(keyFrom, keyTo, actualFrom, to, forward),
binaryFrom,
binaryTo,
forward));
}
final List<KeyValueSegment> searchSpace = baseKeySchema.segmentsToSearch(segments, from, to,
final List<KeyValueSegment> searchSpace = baseKeySchema.segmentsToSearch(segments, actualFrom, to,
forward);
final Bytes binaryFrom = baseKeySchema.lowerRange(keyFrom, from);
final Bytes binaryFrom = baseKeySchema.lowerRange(keyFrom, actualFrom);
final Bytes binaryTo = baseKeySchema.upperRange(keyTo, to);
return new SegmentIterator<>(
searchSpace.iterator(),
baseKeySchema.hasNextCondition(keyFrom, keyTo, from, to, forward),
baseKeySchema.hasNextCondition(keyFrom, keyTo, actualFrom, to, forward),
binaryFrom,
binaryTo,
forward);
@ -235,13 +251,20 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
@Override
public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,
final long timeTo) {
final List<KeyValueSegment> searchSpace = segments.segments(timeFrom, timeTo, true);
final Bytes binaryFrom = baseKeySchema.lowerRange(null, timeFrom);
final long actualFrom = getActualFrom(timeFrom, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && timeTo < actualFrom) {
return KeyValueIterators.emptyIterator();
}
final List<KeyValueSegment> searchSpace = segments.segments(actualFrom, timeTo, true);
final Bytes binaryFrom = baseKeySchema.lowerRange(null, actualFrom);
final Bytes binaryTo = baseKeySchema.upperRange(null, timeTo);
return new SegmentIterator<>(
searchSpace.iterator(),
baseKeySchema.hasNextCondition(null, null, timeFrom, timeTo, true),
baseKeySchema.hasNextCondition(null, null, actualFrom, timeTo, true),
binaryFrom,
binaryTo,
true);
@ -250,13 +273,20 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
@Override
public KeyValueIterator<Bytes, byte[]> backwardFetchAll(final long timeFrom,
final long timeTo) {
final List<KeyValueSegment> searchSpace = segments.segments(timeFrom, timeTo, false);
final Bytes binaryFrom = baseKeySchema.lowerRange(null, timeFrom);
final long actualFrom = getActualFrom(timeFrom, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && timeTo < actualFrom) {
return KeyValueIterators.emptyIterator();
}
final List<KeyValueSegment> searchSpace = segments.segments(actualFrom, timeTo, false);
final Bytes binaryFrom = baseKeySchema.lowerRange(null, actualFrom);
final Bytes binaryTo = baseKeySchema.upperRange(null, timeTo);
return new SegmentIterator<>(
searchSpace.iterator(),
baseKeySchema.hasNextCondition(null, null, timeFrom, timeTo, false),
baseKeySchema.hasNextCondition(null, null, actualFrom, timeTo, false),
binaryFrom,
binaryTo,
false);

View File

@ -23,6 +23,6 @@ public class RocksDBSegmentedBytesStore extends AbstractRocksDBSegmentedBytesSto
final long retention,
final long segmentInterval,
final KeySchema keySchema) {
super(name, metricsScope, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval));
super(name, metricsScope, retention, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval));
}
}

View File

@ -23,6 +23,6 @@ public class RocksDBTimestampedSegmentedBytesStore extends AbstractRocksDBSegmen
final long retention,
final long segmentInterval,
final KeySchema keySchema) {
super(name, metricsScope, keySchema, new TimestampedSegments(name, metricsScope, retention, segmentInterval));
super(name, metricsScope, retention, keySchema, new TimestampedSegments(name, metricsScope, retention, segmentInterval));
}
}

View File

@ -64,6 +64,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Collections;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
@ -195,12 +196,17 @@ public class TimeWindowedKStreamIntegrationTest {
startStreams();
// on window close
// observedStreamTime : 10, retentionPeriod: 10, actualFrom: 0, timeTo: 0, timeFrom: 0
// observedStreamTime : 15, retentionPeriod: 10, actualFrom: 5, timeTo: 5, timeFrom: 1
// observedStreamTime : 25, retentionPeriod: 10, actualFrom: 15, timeTo: 15, timeFrom: 6
final List<KeyValueTimestamp<Windowed<String>, String>> windowedMessages = receiveMessagesWithTimestamp(
new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
new StringDeserializer(),
10L,
String.class,
emitFinal ? 6 : 12);
emitFinal ? 4 : 12);
final List<KeyValueTimestamp<Windowed<String>, String>> expectResult;
if (emitFinal) {
@ -208,8 +214,6 @@ public class TimeWindowedKStreamIntegrationTest {
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1+2", 5),
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+2+3", 10),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+4+5", 11),
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10L, 20L)), "0+3", 10),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), "0+5+6", 15),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)), "0+6", 15)
);
} else {
@ -260,20 +264,22 @@ public class TimeWindowedKStreamIntegrationTest {
startStreams();
// on window close
// observedStreamTime : 15, retentionPeriod: 15, actualFrom: 0, timeTo: 0, timeFrom: 0
// observedStreamTime : 25, retentionPeriod: 15, actualFrom: 10, timeTo: 10, timeFrom: 1
final List<KeyValueTimestamp<Windowed<String>, String>> windowedMessages = receiveMessagesWithTimestamp(
new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
new StringDeserializer(),
10L,
String.class,
emitFinal ? 6 : 13);
emitFinal ? 4 : 13);
final List<KeyValueTimestamp<Windowed<String>, String>> expectResult;
if (emitFinal) {
expectResult = asList(
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1+2", 5),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0L, 10L)), "0+4", 6),
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+2+3", 10),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+4+5", 11),
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10L, 20L)), "0+3", 10),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), "0+5+6", 15)
);
@ -342,12 +348,13 @@ public class TimeWindowedKStreamIntegrationTest {
startStreams();
// ON_WINDOW_CLOSE expires all records.
List<KeyValueTimestamp<Windowed<String>, String>> windowedMessages = receiveMessagesWithTimestamp(
new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
new StringDeserializer(),
10L,
String.class,
emitFinal ? 5 : 9);
emitFinal ? 4 : 9);
List<KeyValueTimestamp<Windowed<String>, String>> expectResult;
if (emitFinal) {
@ -358,8 +365,6 @@ public class TimeWindowedKStreamIntegrationTest {
5),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+L2,R2",
11),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)),
"0+L2,R2+L2,R2", 15),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)),
"0+L2,R2", 15)
);
@ -403,6 +408,20 @@ public class TimeWindowedKStreamIntegrationTest {
// Restart
startStreams();
if (emitFinal) {
windowedMessages = receiveMessagesWithTimestamp(
new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
new StringDeserializer(),
10L,
String.class,
1);
// Output just new/unexpired closed window for C
expectResult = Collections.singletonList(
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(25L, 35L)),
"0+L3,R3", 25)
);
} else {
windowedMessages = receiveMessagesWithTimestamp(
new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
new StringDeserializer(),
@ -410,15 +429,6 @@ public class TimeWindowedKStreamIntegrationTest {
String.class,
2);
if (emitFinal) {
// Output just new closed window for C
expectResult = asList(
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(20L, 30L)),
"0+L3,R3", 25),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(25L, 35L)),
"0+L3,R3", 25)
);
} else {
expectResult = asList(
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(30L, 40L)),
"0+L3,R3", 35),

View File

@ -1650,22 +1650,7 @@ public class KStreamSlidingWindowAggregateTest {
final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
if (emitFinal) {
expected.put(0L, ValueAndTimestamp.make("ARSTU", 10L));
expected.put(3L, ValueAndTimestamp.make("ASTU", 10L));
expected.put(4L, ValueAndTimestamp.make("ATU", 10L));
expected.put(5L, ValueAndTimestamp.make("ABTU", 15L));
expected.put(6L, ValueAndTimestamp.make("ABCU", 16L));
expected.put(8L, ValueAndTimestamp.make("ABCDU", 18L));
expected.put(9L, ValueAndTimestamp.make("ABCD", 18L));
expected.put(11L, ValueAndTimestamp.make("BCD", 18L));
expected.put(16L, ValueAndTimestamp.make("CD", 18L));
expected.put(17L, ValueAndTimestamp.make("D", 18L));
expected.put(20L, ValueAndTimestamp.make("E", 30L));
expected.put(30L, ValueAndTimestamp.make("EF", 40L));
expected.put(31L, ValueAndTimestamp.make("F", 40L));
expected.put(45L, ValueAndTimestamp.make("G", 55L));
expected.put(46L, ValueAndTimestamp.make("GH", 56L));
expected.put(48L, ValueAndTimestamp.make("GHIJ", 58L));
// only non-expired records
expected.put(52L, ValueAndTimestamp.make("GHIJK", 62L));
expected.put(53L, ValueAndTimestamp.make("GHIJKLMN", 63L));
expected.put(56L, ValueAndTimestamp.make("HIJKLMN", 63L));
@ -1675,6 +1660,7 @@ public class KStreamSlidingWindowAggregateTest {
expected.put(66L, ValueAndTimestamp.make("O", 76L));
expected.put(67L, ValueAndTimestamp.make("OP", 77L));
expected.put(70L, ValueAndTimestamp.make("OPQ", 80L));
} else {
expected.put(0L, ValueAndTimestamp.make("ARSTU", 10L));
expected.put(3L, ValueAndTimestamp.make("ASTU", 10L));

View File

@ -291,6 +291,7 @@ public class KStreamWindowAggregateTest {
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)), "0+3", 14),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+4", 12)
);
processors.get(1).checkAndClearProcessResult();
processors.get(2).checkAndClearProcessResult();
@ -301,6 +302,8 @@ public class KStreamWindowAggregateTest {
inputTopic2.pipeInput("A", "a", 15L);
processors.get(0).checkAndClearProcessResult();
if (withCache) {
processors.get(1).checkAndClearProcessResult(
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+a", 0),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)), "0+b", 1),
@ -313,6 +316,10 @@ public class KStreamWindowAggregateTest {
"0+2%0+b", 1),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+3%0+c",
2));
} else {
processors.get(0).checkAndClearProcessResult();
processors.get(2).checkAndClearProcessResult();
}
inputTopic2.pipeInput("A", "a", 5L);
inputTopic2.pipeInput("B", "b", 6L);
@ -321,11 +328,23 @@ public class KStreamWindowAggregateTest {
inputTopic2.pipeInput("A", "a", 21L);
processors.get(0).checkAndClearProcessResult();
if (withCache) {
processors.get(1).checkAndClearProcessResult(
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+a", 5),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+b", 6),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+d+d", 10)
);
} else {
processors.get(1).checkAndClearProcessResult(
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+a", 0),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)), "0+b", 1),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+c", 2),
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+a", 5),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+b", 6),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+d+d", 10)
);
}
processors.get(2).checkAndClearProcessResult(
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+1%0+a",
10),

View File

@ -233,12 +233,21 @@ public class SessionWindowedKStreamImplTest {
processData(driver);
final SessionStore<String, Long> store = driver.getSessionStore("count-store");
final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(store.fetch("1", "2"));
if (!emitFinal) {
assertThat(
data,
equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L),
KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L),
KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), 2L))));
} else {
assertThat(
data,
equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L),
KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), 2L))));
}
}
}
@ -251,12 +260,21 @@ public class SessionWindowedKStreamImplTest {
final SessionStore<String, String> sessionStore = driver.getSessionStore("reduced");
final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
if (!emitFinal) {
assertThat(
data,
equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"),
KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"),
KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "1+2"))));
} else {
assertThat(
data,
equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"),
KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "1+2"))));
}
}
}
@ -272,12 +290,21 @@ public class SessionWindowedKStreamImplTest {
processData(driver);
final SessionStore<String, String> sessionStore = driver.getSessionStore("aggregated");
final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
if (!emitFinal) {
assertThat(
data,
equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"),
KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"),
KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "0+0+1+2"))));
} else {
assertThat(
data,
equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"),
KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "0+0+1+2"))));
}
}
}

View File

@ -51,6 +51,7 @@ import org.junit.Test;
import java.util.List;
import java.util.Properties;
import java.util.Collections;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
@ -198,6 +199,7 @@ public class TimeWindowedKStreamImplTest {
}
final ArrayList<KeyValueTimestamp<Windowed<String>, String>> processed = supplier.theCapturedProcessor().processed();
if (emitFinal) {
assertEquals(
asList(
@ -238,11 +240,26 @@ public class TimeWindowedKStreamImplTest {
final List<KeyValue<Windowed<String>, Long>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
if (withCache) {
// with cache returns all records (expired from underneath as well) as part of
// the merge process
assertThat(data, equalTo(asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L),
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), 1L))));
} else {
// without cache, we get only non-expired record from underlying store.
if (!emitFinal) {
assertThat(data, equalTo(Collections.singletonList(
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), 1L))));
} else {
assertThat(data, equalTo(asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L),
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), 1L))));
}
}
}
{
final WindowStore<String, ValueAndTimestamp<Long>> windowStore =
@ -250,11 +267,24 @@ public class TimeWindowedKStreamImplTest {
final List<KeyValue<Windowed<String>, ValueAndTimestamp<Long>>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
// the same values and logic described above applies here as well.
if (withCache) {
assertThat(data, equalTo(asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make(2L, 15L)),
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make(1L, 500L)),
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make(2L, 550L)),
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L)))));
} else {
if (!emitFinal) {
assertThat(data, equalTo(Collections.singletonList(
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L)))));
} else {
assertThat(data, equalTo(asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make(1L, 500L)),
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make(2L, 550L)),
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L)))));
}
}
}
}
}
@ -274,22 +304,37 @@ public class TimeWindowedKStreamImplTest {
final List<KeyValue<Windowed<String>, String>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
if (withCache) {
// with cache returns all records (expired from underneath as well) as part of
// the merge process
assertThat(data, equalTo(asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"),
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"),
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "10+20"),
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "30"))));
} else {
// without cache, we get only non-expired record from underlying store.
// actualFrom = observedStreamTime(1500) - retentionPeriod(1000) + 1 = 501.
// only 1 record is non expired and would be returned.
assertThat(data, equalTo(Collections.singletonList(KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "30"))));
}
}
{
final WindowStore<String, ValueAndTimestamp<String>> windowStore = driver.getTimestampedWindowStore("reduced");
final List<KeyValue<Windowed<String>, ValueAndTimestamp<String>>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
// same logic/data as explained above.
if (withCache) {
assertThat(data, equalTo(asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("1+2", 15L)),
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("3", 500L)),
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("10+20", 550L)),
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("30", 1000L)))));
} else {
assertThat(data, equalTo(Collections.singletonList(
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("30", 1000L)))));
}
}
}
}
@ -310,22 +355,36 @@ public class TimeWindowedKStreamImplTest {
final List<KeyValue<Windowed<String>, String>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
if (withCache) {
// with cache returns all records (expired from underneath as well) as part of
// the merge process
assertThat(data, equalTo(asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"),
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"),
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+10+20"),
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "0+30"))));
} else {
// without cache, we get only non-expired record from underlying store.
// actualFrom = observedStreamTime(1500) - retentionPeriod(1000) + 1 = 501.
// only 1 record is non expired and would be returned.
assertThat(data, equalTo(Collections
.singletonList(KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "0+30"))));
}
}
{
final WindowStore<String, ValueAndTimestamp<String>> windowStore = driver.getTimestampedWindowStore("aggregated");
final List<KeyValue<Windowed<String>, ValueAndTimestamp<String>>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L)));
if (withCache) {
assertThat(data, equalTo(asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("0+1+2", 15L)),
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+3", 500L)),
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+10+20", 550L)),
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("0+30", 1000L)))));
} else {
assertThat(data, equalTo(Collections.singletonList(
KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("0+30", 1000L)))));
}
}
}
}

View File

@ -179,11 +179,10 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = asList(
KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L)
);
// For all tests, actualFrom is computed using observedStreamTime - retention + 1.
// so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
// all records expired as actual from is 59001 and to is 1000
final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
assertEquals(expected, toList(values));
}
@ -191,11 +190,8 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = asList(
KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
);
// all records expired as actual from is 59001 and to is 1000
final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
assertEquals(expected, toList(values));
}
@ -203,20 +199,16 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = asList(
KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
);
// all records expired as actual from is 59001 and to is 1000
final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
assertEquals(expected, toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = asList(
KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
// key B is expired as actual from is 59001
final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
@ -226,10 +218,8 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
null, null, 0, windows[3].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = asList(
KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
// keys A and B expired as actual from is 59001
final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
@ -251,10 +241,10 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = asList(
KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
);
// For all tests, actualFrom is computed using observedStreamTime - retention + 1.
// so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
// all records expired as actual from is 59001 and to = 1000
final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
assertEquals(expected, toList(values));
}
@ -262,11 +252,8 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = asList(
KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
);
// all records expired as actual from is 59001 and to = 1000
final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
assertEquals(expected, toList(values));
}
@ -274,21 +261,17 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = asList(
KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
);
// all records expired as actual from is 59001 and to = 1000
final List<KeyValue<Windowed<String>, Long>> expected = Collections.emptyList();
assertEquals(expected, toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = asList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
// only 1 record left as actual from is 59001 and to = 60,000
final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
assertEquals(expected, toList(values));
@ -297,11 +280,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
null, null, 0, windows[3].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = asList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
// only 1 record left as actual from is 59001 and to = 60,000
final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
assertEquals(expected, toList(values));
@ -854,18 +835,24 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), expectedValue3);
bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), expectedValue4);
// Record expired as timestampFromRawKey = 1000 while observedStreamTime = 60,000 and retention = 1000.
final byte[] value1 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(
key1, windows[0].start(), windows[0].end());
assertEquals(Bytes.wrap(value1), Bytes.wrap(expectedValue1));
assertNull(value1);
// Record expired as timestampFromRawKey = 1000 while observedStreamTime = 60,000 and retention = 1000.
final byte[] value2 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(
key1, windows[1].start(), windows[1].end());
assertEquals(Bytes.wrap(value2), Bytes.wrap(expectedValue2));
assertNull(value2);
// expired record
// timestampFromRawKey = 1500 while observedStreamTime = 60,000 and retention = 1000.
final byte[] value3 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(
key2, windows[2].start(), windows[2].end());
assertEquals(Bytes.wrap(value3), Bytes.wrap(expectedValue3));
assertNull(value3);
// only non-expired record
// timestampFromRawKey = 60,000 while observedStreamTime = 60,000 and retention = 1000.
final byte[] value4 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(
key3, windows[3].start(), windows[3].end());
assertEquals(Bytes.wrap(value4), Bytes.wrap(expectedValue4));
@ -991,10 +978,26 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(
Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 1, 2000)) {
final List<KeyValue<Windowed<String>, Long>> expected = asList(
final List<KeyValue<Windowed<String>, Long>> expected;
// actual from: observedStreamTime - retention + 1
if (getBaseSchema() instanceof TimeFirstWindowKeySchema) {
// For windowkeyschema, actual from is 1
// observed stream time = 1000. Retention Period = 1000.
// actual from = (1000 - 1000 + 1)
// and search happens in the range 1-2000
expected = asList(
KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
KeyValue.pair(new Windowed<>(keyB, windows[2]), 20L)
);
} else {
// For session key schema, actual from is 501
// observed stream time = 1500. Retention Period = 1000.
// actual from = (1500 - 1000 + 1)
// and search happens in the range 501-2000
expected = Collections.singletonList(KeyValue.pair(new Windowed<>(keyB, windows[2]), 20L));
}
assertEquals(expected, toList(results));
}
@ -1010,11 +1013,27 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(10));
bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50));
bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100));
// actual from: observedStreamTime - retention + 1
// retention = 1000
try (final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1, 999)) {
final List<KeyValue<Windowed<String>, Long>> expected = asList(
final List<KeyValue<Windowed<String>, Long>> expected;
// actual from: observedStreamTime - retention + 1
if (getBaseSchema() instanceof TimeFirstWindowKeySchema) {
// For windowkeyschema, actual from is 1
// observed stream time = 1000. actual from = (1000 - 1000 + 1)
// and search happens in the range 1-2000
expected = asList(
KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
KeyValue.pair(new Windowed<>(key, windows[1]), 50L)
);
} else {
// For session key schema, actual from is 501
// observed stream time = 1500. actual from = (1500 - 1000 + 1)
// and search happens in the range 501-2000 deeming first record as expired.
expected = Collections.singletonList(KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
}
assertEquals(expected, toList(results));
}
@ -1054,15 +1073,24 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1500));
// For all tests, actualFrom is computed using observedStreamTime - retention + 1.
// so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
// don't return expired records.
assertEquals(
asList(
KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
KeyValue.pair(new Windowed<>(key, windows[1]), 100L),
KeyValue.pair(new Windowed<>(key, windows[2]), 500L)
),
Collections.emptyList(),
results
);
final List<KeyValue<Windowed<String>, Long>> results1 = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 59000, 60000));
// only non expired record as actual from is 59001
assertEquals(
Collections.singletonList(
KeyValue.pair(new Windowed<>(key, windows[3]), 1000L)
),
results1
);
segments.close();
}
@ -1086,9 +1114,11 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
);
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
// actualFrom is computed using observedStreamTime - retention + 1.
// so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
// only one record returned as actual from is 59001
assertEquals(
asList(
KeyValue.pair(new Windowed<>(keyA, windows[0]), 50L),
Collections.singletonList(
KeyValue.pair(new Windowed<>(keyB, windows[3]), 100L)
),
results
@ -1115,12 +1145,13 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
),
segmentDirs()
);
// For all tests, actualFrom is computed using observedStreamTime - retention + 1.
// so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
// key A expired as actual from is 59,001
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.backwardAll());
assertEquals(
asList(
KeyValue.pair(new Windowed<>(keyB, windows[3]), 100L),
KeyValue.pair(new Windowed<>(keyA, windows[0]), 50L)
Collections.singletonList(
KeyValue.pair(new Windowed<>(keyB, windows[3]), 100L)
),
results
);
@ -1147,9 +1178,11 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
);
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetchAll(0L, 60_000L));
// For all tests, actualFrom is computed using observedStreamTime - retention + 1.
// so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001
// only 1 record fetched as actual from is 59001
assertEquals(
asList(
KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
Collections.singletonList(
KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
),
results
@ -1277,9 +1310,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
assertEquals(2, bytesStore.getSegments().size());
final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L));
// after restoration, only 1 record should be returned as actual from is 59001 and the prior record is expired.
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
assertEquals(expected, results);
}
@ -1332,10 +1365,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
final String key = "a";
final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L));
// after restoration, only non expired segments should be returned which is one as actual from is 59001
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
assertEquals(expected, results);
assertThat(bytesStore.getPosition(), Matchers.notNullValue());
@ -1368,9 +1400,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
assertEquals(2, bytesStore.getSegments().size());
final String key = "a";
// only non expired record as actual from is 59001
final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L));
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
@ -1407,7 +1439,15 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
assertEquals(1, bytesStore.getSegments().size());
final String key = "a";
final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
// actual from = observedStreamTime - retention + 1.
// retention = 1000
if (getBaseSchema() instanceof TimeFirstWindowKeySchema) {
// For window stores, observedSteam = 1000 => actualFrom = 1
// For session stores, observedSteam = 1500 => actualFrom = 501 which deems
// the below record as expired.
expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
}
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
assertEquals(expected, results);

View File

@ -171,44 +171,30 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L)
);
assertEquals(expected, toList(values));
// All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
// for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
);
assertEquals(expected, toList(values));
// All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
// for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
);
assertEquals(expected, toList(values));
// All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
// for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
// Only 1 record not expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
// for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
@ -217,11 +203,9 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
null, null, 0, windows[3].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
// Only 1 record not expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
// for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
@ -242,44 +226,33 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
);
assertEquals(expected, toList(values));
// All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
// for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
);
assertEquals(expected, toList(values));
// All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
// for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
);
assertEquals(expected, toList(values));
// All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
// for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
// Only 1 record not expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
// for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
assertEquals(expected, toList(values));
@ -287,12 +260,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.backwardFetch(
null, null, 0, windows[3].start())) {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
// Only 1 record not expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1)
// for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000
final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
assertEquals(expected, toList(values));
@ -306,10 +277,18 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50));
bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100));
try (final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1, 999)) {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
KeyValue.pair(new Windowed<>(key, windows[1]), 50L)
);
final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
/*
* For WindowKeySchema, the observedStreamTime is 1000 which means 1 extra record gets returned while for
* SessionKeySchema, it's 1500. Which changes the actual-from while fetching. In case of SessionKeySchema, the
* fetch happens from 501-999 while for WindowKeySchema it's from 1-999.
*/
if (schema instanceof SessionKeySchema) {
expected.add(KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
} else {
expected.add(KeyValue.pair(new Windowed<>(key, windows[0]), 10L));
expected.add(KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
}
assertEquals(expected, toList(results));
}
@ -341,16 +320,13 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
assertEquals(Utils.mkSet(segments.segmentName(0), segments.segmentName(1)), segmentDirs());
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1500));
/*
* All records expired as observed stream time = 60,000 which sets actual-from to 59001(60,000 - 1000 + 1). to = 1500.
*/
assertEquals(
Arrays.asList(
KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
KeyValue.pair(new Windowed<>(key, windows[1]), 100L),
KeyValue.pair(new Windowed<>(key, windows[2]), 500L)
),
Collections.emptyList(),
results
);
segments.close();
}
@ -371,11 +347,12 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
),
segmentDirs()
);
/*
* Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = Long.MAX.
*/
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
assertEquals(
Arrays.asList(
KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
Collections.singletonList(
KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
),
results
@ -401,11 +378,12 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
),
segmentDirs()
);
/*
* Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = 60,000.
*/
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetchAll(0L, 60_000L));
assertEquals(
Arrays.asList(
KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
Collections.singletonList(
KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
),
results
@ -529,8 +507,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
// 2 segments are created during restoration.
assertEquals(2, bytesStore.getSegments().size());
/*
* Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = Long.MAX.
*/
final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L));
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
@ -584,9 +564,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
assertEquals(2, bytesStore.getSegments().size());
final String key = "a";
/*
* Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = Long.MAX.
*/
final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L));
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
@ -621,9 +602,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
assertEquals(2, bytesStore.getSegments().size());
final String key = "a";
/*
* Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = Long.MAX.
*/
final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L));
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
@ -659,11 +641,20 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
// 1 segments are created during restoration.
assertEquals(1, bytesStore.getSegments().size());
final String key = "a";
/*
* For WindowKeySchema, the observedStreamTime is 1000 which means 1 extra record gets returned while for
* SessionKeySchema, it's 1500. Which changes the actual-from while fetching. In case of SessionKeySchema, the
* fetch happens from 501 to end while for WindowKeySchema it's from 1 to end.
*/
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
if (schema instanceof SessionKeySchema) {
assertEquals(Collections.emptyList(), results);
} else {
final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
assertEquals(expected, results);
}
assertThat(bytesStore.getPosition(), Matchers.notNullValue());
assertThat(bytesStore.getPosition().getPartitionPositions("A"), hasEntry(0, 2L));
}

View File

@ -45,6 +45,7 @@ import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import java.util.ArrayList;
import java.util.Arrays;
@ -58,6 +59,7 @@ import java.util.Properties;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.common.utils.Utils.toList;
import static org.apache.kafka.test.StreamsTestUtils.valuesToSet;
import static org.hamcrest.CoreMatchers.equalTo;
@ -913,7 +915,13 @@ public abstract class AbstractSessionBytesStoreTest {
try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE)
) {
if (getStoreType() == StoreType.InMemoryStore) {
assertEquals(valuesToSet(iterator), new HashSet<>(Arrays.asList(2L, 3L, 4L)));
} else {
// The 2 records with values 2L and 3L are considered expired as
// their end times < observed stream time - retentionPeriod + 1.
Assertions.assertEquals(valuesToSet(iterator), new HashSet<>(Collections.singletonList(4L)));
}
}
}
@ -934,4 +942,43 @@ public abstract class AbstractSessionBytesStoreTest {
final Position actual = sessionStore.getPosition();
assertThat(expected, is(actual));
}
@Test
public void shouldNotFetchExpiredSessions() {
final long systemTime = Time.SYSTEM.milliseconds();
sessionStore.put(new Windowed<>("p", new SessionWindow(systemTime - 3 * RETENTION_PERIOD, systemTime - 2 * RETENTION_PERIOD)), 1L);
sessionStore.put(new Windowed<>("q", new SessionWindow(systemTime - 2 * RETENTION_PERIOD, systemTime - RETENTION_PERIOD)), 4L);
sessionStore.put(new Windowed<>("r", new SessionWindow(systemTime - RETENTION_PERIOD, systemTime - RETENTION_PERIOD / 2)), 3L);
sessionStore.put(new Windowed<>("p", new SessionWindow(systemTime - RETENTION_PERIOD, systemTime - RETENTION_PERIOD / 2)), 2L);
try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions("p", systemTime - 2 * RETENTION_PERIOD, systemTime - RETENTION_PERIOD)
) {
Assertions.assertEquals(mkSet(2L), valuesToSet(iterator));
}
try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.backwardFindSessions("p", systemTime - 5 * RETENTION_PERIOD, systemTime - 4 * RETENTION_PERIOD)
) {
Assertions.assertFalse(iterator.hasNext());
}
try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions("p", "r", systemTime - 5 * RETENTION_PERIOD, systemTime - 4 * RETENTION_PERIOD)
) {
Assertions.assertFalse(iterator.hasNext());
}
try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions("p", "r", systemTime - RETENTION_PERIOD, systemTime - RETENTION_PERIOD / 2)
) {
Assertions.assertEquals(valuesToSet(iterator), mkSet(2L, 3L, 4L));
}
try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions("p", "r", systemTime - 2 * RETENTION_PERIOD, systemTime - RETENTION_PERIOD)
) {
Assertions.assertEquals(valuesToSet(iterator), mkSet(2L, 3L, 4L));
}
try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.backwardFindSessions("p", "r", systemTime - 2 * RETENTION_PERIOD, systemTime - RETENTION_PERIOD)
) {
Assertions.assertEquals(valuesToSet(iterator), mkSet(2L, 3L, 4L));
}
}
}

View File

@ -342,7 +342,7 @@ public abstract class AbstractWindowBytesStoreTest {
);
assertEquals(
asList(zero, one, two, three),
toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime + 0), ofEpochMilli(defaultStartTime + 3)))
toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime), ofEpochMilli(defaultStartTime + 3)))
);
assertEquals(
asList(one, two, three, four, five),
@ -360,7 +360,7 @@ public abstract class AbstractWindowBytesStoreTest {
);
assertEquals(
asList(three, two, one, zero),
toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime + 0), ofEpochMilli(defaultStartTime + 3)))
toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime), ofEpochMilli(defaultStartTime + 3)))
);
assertEquals(
asList(five, four, three, two, one),

View File

@ -101,7 +101,8 @@ public class CachingPersistentWindowStoreTest {
@Before
public void setUp() {
keySchema = new WindowKeySchema();
bytesStore = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0, SEGMENT_INTERVAL, keySchema);
///KAFKA-12960: Adding a retention of 100 ms to make all test cases work as is.
bytesStore = new RocksDBSegmentedBytesStore("test", "metrics-scope", 100, SEGMENT_INTERVAL, keySchema);
underlyingStore = new RocksDBWindowStore(bytesStore, false, WINDOW_SIZE);
final TimeWindowedDeserializer<String> keyDeserializer = new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE);
keyDeserializer.setIsChangelogTopic(true);

View File

@ -95,6 +95,7 @@ public class MeteredSessionStoreTest {
private static final byte[] VALUE_BYTES = VALUE.getBytes();
private static final long START_TIMESTAMP = 24L;
private static final long END_TIMESTAMP = 42L;
private static final int RETENTION_PERIOD = 100;
private final String threadId = Thread.currentThread().getName();
private final TaskId taskId = new TaskId(0, 0, "My-Topology");
@ -429,6 +430,54 @@ public class MeteredSessionStoreTest {
verify(innerStore);
}
@Test
public void shouldReturnNoSessionsWhenFetchedKeyHasExpired() {
final long systemTime = Time.SYSTEM.milliseconds();
expect(innerStore.findSessions(KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime))
.andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
init();
final KeyValueIterator<Windowed<String>, String> iterator = store.findSessions(KEY, systemTime - RETENTION_PERIOD, systemTime);
assertFalse(iterator.hasNext());
iterator.close();
}
@Test
public void shouldReturnNoSessionsInBackwardOrderWhenFetchedKeyHasExpired() {
final long systemTime = Time.SYSTEM.milliseconds();
expect(innerStore.backwardFindSessions(KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime))
.andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
init();
final KeyValueIterator<Windowed<String>, String> iterator = store.backwardFindSessions(KEY, systemTime - RETENTION_PERIOD, systemTime);
assertFalse(iterator.hasNext());
iterator.close();
}
@Test
public void shouldNotFindExpiredSessionRangeFromStore() {
final long systemTime = Time.SYSTEM.milliseconds();
expect(innerStore.findSessions(KEY_BYTES, KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime))
.andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
init();
final KeyValueIterator<Windowed<String>, String> iterator = store.findSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime);
assertFalse(iterator.hasNext());
iterator.close();
}
@Test
public void shouldNotFindExpiredSessionRangeInBackwardOrderFromStore() {
final long systemTime = Time.SYSTEM.milliseconds();
expect(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime))
.andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
init();
final KeyValueIterator<Windowed<String>, String> iterator = store.backwardFindSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime);
assertFalse(iterator.hasNext());
iterator.close();
}
@Test
public void shouldRecordRestoreTimeOnInit() {
init();

View File

@ -45,6 +45,7 @@ import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -82,6 +83,7 @@ public class MeteredWindowStoreTest {
private static final String VALUE = "value";
private static final byte[] VALUE_BYTES = VALUE.getBytes();
private static final int WINDOW_SIZE_MS = 10;
private static final int RETENTION_PERIOD = 100;
private static final long TIMESTAMP = 42L;
private final String threadId = Thread.currentThread().getName();
@ -270,6 +272,18 @@ public class MeteredWindowStoreTest {
verify(innerStoreMock);
}
@Test
public void shouldReturnNoRecordWhenFetchedKeyHasExpired() {
expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1 + RETENTION_PERIOD))
.andReturn(KeyValueIterators.emptyWindowStoreIterator());
replay(innerStoreMock);
store.init((StateStoreContext) context, store);
store.fetch("a", ofEpochMilli(1), ofEpochMilli(1).plus(RETENTION_PERIOD, ChronoUnit.MILLIS)).close(); // recorded on close;
verify(innerStoreMock);
}
@Test
public void shouldFetchRangeFromInnerStoreAndRecordFetchMetrics() {
expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1))

View File

@ -187,19 +187,40 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
),
segmentDirs(baseDir)
);
// For all tests, for WindowStore actualFrom is computed using observedStreamTime - retention + 1.
// while for TimeOrderedWindowStores, actualFrom = observedStreamTime - retention
// expired record
assertEquals(
new HashSet<>(Collections.singletonList("zero")),
new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
0,
ofEpochMilli(startTime - WINDOW_SIZE),
ofEpochMilli(startTime + WINDOW_SIZE))));
// RocksDbWindwStore =>
// from = 149997
// to = 150003
// actualFrom = 150001
// record one timestamp is 150,000 So, it's ignored.
// RocksDBTimeOrderedWindowStore*Index =>
// from = 149997
// to = 150003
// actualFrom = 150000, hence not ignored
if (storeType == StoreType.RocksDBWindowStore) {
assertEquals(
new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
1,
ofEpochMilli(startTime + increment - WINDOW_SIZE),
ofEpochMilli(startTime + increment + WINDOW_SIZE))));
} else {
assertEquals(
new HashSet<>(Collections.singletonList("one")),
valuesToSet(windowStore.fetch(
1,
ofEpochMilli(startTime + increment - WINDOW_SIZE),
ofEpochMilli(startTime + increment + WINDOW_SIZE))));
}
assertEquals(
new HashSet<>(Collections.singletonList("two")),
valuesToSet(windowStore.fetch(
@ -247,12 +268,32 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
1,
ofEpochMilli(startTime + increment - WINDOW_SIZE),
ofEpochMilli(startTime + increment + WINDOW_SIZE))));
// RocksDbWindwStore =>
// from = 179997
// to = 180003
// actualFrom = 170001
// record one timestamp is 180,000 So, it's ignored.
// RocksDBTimeOrderedWindowStore*Index =>
// from = 179997
// to = 180003
// actualFrom = 180000, hence not ignored
if (storeType == StoreType.RocksDBWindowStore) {
assertEquals(
// expired record
new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
2,
ofEpochMilli(startTime + increment * 2 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 2 + WINDOW_SIZE))));
} else {
assertEquals(
// expired record
new HashSet<>(Collections.singletonList("two")),
valuesToSet(windowStore.fetch(
2,
ofEpochMilli(startTime + increment * 2 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 2 + WINDOW_SIZE))));
}
assertEquals(
new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
@ -301,7 +342,8 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
ofEpochMilli(startTime + increment - WINDOW_SIZE),
ofEpochMilli(startTime + increment + WINDOW_SIZE))));
assertEquals(
new HashSet<>(Collections.singletonList("two")),
// expired record
new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
2,
ofEpochMilli(startTime + increment * 2 - WINDOW_SIZE),
@ -371,12 +413,24 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
3,
ofEpochMilli(startTime + increment * 3 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 3 + WINDOW_SIZE))));
if (storeType == StoreType.RocksDBWindowStore) {
assertEquals(
// expired record
new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
4,
ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
} else {
assertEquals(
// expired record
new HashSet<>(Collections.singletonList("four")),
valuesToSet(windowStore.fetch(
4,
ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
}
assertEquals(
new HashSet<>(Collections.singletonList("five")),
valuesToSet(windowStore.fetch(
@ -465,7 +519,14 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
iter.next();
fetchedCount++;
}
// 1 extra record is expired in the case of RocksDBWindowStore as
// actualFrom = observedStreamTime - retentionPeriod + 1. The +1
// isn't present for RocksDbTimeOrderedStoreWith*Index
if (storeType == StoreType.RocksDBWindowStore) {
assertEquals(1, fetchedCount);
} else {
assertEquals(2, fetchedCount);
}
assertEquals(
Utils.mkSet(segments.segmentName(1L), segments.segmentName(3L)),
@ -480,8 +541,10 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
iter.next();
fetchedCount++;
}
assertEquals(1, fetchedCount);
// the latest record has a timestamp > 60k. So, the +1 in actualFrom calculation in
// RocksDbWindowStore shouldn't have an implciation and all stores should return the same fetched counts.
assertEquals(1, fetchedCount);
assertEquals(
Utils.mkSet(segments.segmentName(3L), segments.segmentName(5L)),
segmentDirs(baseDir)
@ -564,6 +627,9 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
Serdes.String());
windowStore.init((StateStoreContext) context, windowStore);
// For all tests, for WindowStore actualFrom is computed using observedStreamTime - retention + 1.
// while for TimeOrderedWindowStores, actualFrom = observedStreamTime - retention
assertEquals(
new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
@ -650,12 +716,31 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
3,
ofEpochMilli(startTime + increment * 3 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 3 + WINDOW_SIZE))));
// RocksDbWindwStore =>
// from = 239,997
// to = 240,003
// actualFrom = 240,001
// record four timestamp is 240,000 So, it's ignored.
// RocksDBTimeOrderedWindowStore*Index =>
// from = 239,997
// to = 240,003
// actualFrom = 240,000, hence not ignored
if (storeType == StoreType.RocksDBWindowStore) {
assertEquals(
new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
4,
ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
} else {
assertEquals(
new HashSet<>(Collections.singletonList("four")),
valuesToSet(windowStore.fetch(
4,
ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
}
assertEquals(
new HashSet<>(Collections.singletonList("five")),
valuesToSet(windowStore.fetch(