diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java index 32a2a4d436f..b2405db7317 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java @@ -186,7 +186,8 @@ class KStreamImplJoin { thisWindowStore.name(), internalWindows, joiner, - sharedTimeTracker + sharedTimeTracker, + windows.size() + windows.gracePeriodMs() ); final PassThrough joinMerge = new PassThrough<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java index f3ce64dafeb..a6af2a4e082 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java @@ -41,6 +41,7 @@ class KStreamKStreamSelfJoin implements ProcessorSupplier joinerThis; private final TimeTracker sharedTimeTracker; @@ -49,7 +50,8 @@ class KStreamKStreamSelfJoin implements ProcessorSupplier joinerThis, - final TimeTracker sharedTimeTracker) { + final TimeTracker sharedTimeTracker, + final long retentionPeriod) { this.windowName = windowName; this.joinThisBeforeMs = windows.beforeMs; @@ -58,6 +60,7 @@ class KStreamKStreamSelfJoin implements ProcessorSupplier implements ProcessorSupplier sharedTimeTracker.streamTime - retentionPeriod + 1; // Join current record with other try (final WindowStoreIterator iter = windowStore.fetch(record.key(), timeFrom, timeTo)) { @@ -120,7 +125,7 @@ class KStreamKStreamSelfJoin implements ProcessorSupplier implements ProcessorSupplier segments; protected final KeySchema baseKeySchema; protected final Optional indexKeySchema; + private final long retentionPeriod; protected ProcessorContext context; @@ -66,22 +67,27 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore indexKeySchema, - final AbstractSegments segments) { + final AbstractSegments segments, + final long retentionPeriod) { this.name = name; this.baseKeySchema = baseKeySchema; this.indexKeySchema = indexKeySchema; this.segments = segments; + this.retentionPeriod = retentionPeriod; } @Override public KeyValueIterator all() { + + final long actualFrom = getActualFrom(0, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema); + final List searchSpace = segments.allSegments(true); - final Bytes from = baseKeySchema.lowerRange(null, 0); + final Bytes from = baseKeySchema.lowerRange(null, actualFrom); final Bytes to = baseKeySchema.upperRange(null, Long.MAX_VALUE); return new SegmentIterator<>( searchSpace.iterator(), - baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, true), + baseKeySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, true), from, to, true); @@ -89,13 +95,16 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore backwardAll() { + + final long actualFrom = getActualFrom(0, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema); + final List searchSpace = segments.allSegments(false); - final Bytes from = baseKeySchema.lowerRange(null, 0); + final Bytes from = baseKeySchema.lowerRange(null, actualFrom); final Bytes to = baseKeySchema.upperRange(null, Long.MAX_VALUE); return new SegmentIterator<>( searchSpace.iterator(), - baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, false), + baseKeySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, false), from, to, false); @@ -119,6 +128,15 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore getIndexKeyValue(final Bytes baseKey, final byte[] baseValue); + // isTimeFirstWindowSchema true implies ON_WINDOW_CLOSE semantics. There's an edge case + // when retentionPeriod = grace Period. If we add 1, then actualFrom > to which would + // lead to no records being returned. + protected long getActualFrom(final long from, final boolean isTimeFirstWindowSchema) { + return isTimeFirstWindowSchema ? Math.max(from, observedStreamTime - retentionPeriod) : + Math.max(from, observedStreamTime - retentionPeriod + 1); + + } + // For testing void putIndex(final Bytes indexKey, final byte[] value) { if (!hasIndex()) { @@ -191,7 +209,24 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore implements Se private final String name; private final AbstractSegments segments; private final String metricScope; + private final long retentionPeriod; private final KeySchema keySchema; private ProcessorContext context; @@ -65,10 +66,12 @@ public class AbstractRocksDBSegmentedBytesStore implements Se AbstractRocksDBSegmentedBytesStore(final String name, final String metricScope, + final long retentionPeriod, final KeySchema keySchema, final AbstractSegments segments) { this.name = name; this.metricScope = metricScope; + this.retentionPeriod = retentionPeriod; this.keySchema = keySchema; this.segments = segments; } @@ -91,19 +94,30 @@ public class AbstractRocksDBSegmentedBytesStore implements Se final long from, final long to, final boolean forward) { - final List searchSpace = keySchema.segmentsToSearch(segments, from, to, forward); + final long actualFrom = getActualFrom(from); - final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, from); + if (keySchema instanceof WindowKeySchema && to < actualFrom) { + LOG.debug("Returning no records for key {} as to ({}) < actualFrom ({}) ", key.toString(), to, actualFrom); + return KeyValueIterators.emptyIterator(); + } + + final List searchSpace = keySchema.segmentsToSearch(segments, actualFrom, to, forward); + + final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, actualFrom); final Bytes binaryTo = keySchema.upperRangeFixedSize(key, to); return new SegmentIterator<>( searchSpace.iterator(), - keySchema.hasNextCondition(key, key, from, to, forward), + keySchema.hasNextCondition(key, key, actualFrom, to, forward), binaryFrom, binaryTo, forward); } + private long getActualFrom(final long from) { + return Math.max(from, observedStreamTime - retentionPeriod + 1); + } + @Override public KeyValueIterator fetch(final Bytes keyFrom, final Bytes keyTo, @@ -133,14 +147,21 @@ public class AbstractRocksDBSegmentedBytesStore implements Se return KeyValueIterators.emptyIterator(); } - final List searchSpace = keySchema.segmentsToSearch(segments, from, to, forward); + final long actualFrom = getActualFrom(from); - final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, from); + if (keySchema instanceof WindowKeySchema && to < actualFrom) { + LOG.debug("Returning no records for keys {}/{} as to ({}) < actualFrom ({}) ", keyFrom, keyTo, to, actualFrom); + return KeyValueIterators.emptyIterator(); + } + + final List searchSpace = keySchema.segmentsToSearch(segments, actualFrom, to, forward); + + final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, actualFrom); final Bytes binaryTo = keyTo == null ? null : keySchema.upperRange(keyTo, to); return new SegmentIterator<>( searchSpace.iterator(), - keySchema.hasNextCondition(keyFrom, keyTo, from, to, forward), + keySchema.hasNextCondition(keyFrom, keyTo, actualFrom, to, forward), binaryFrom, binaryTo, forward); @@ -148,11 +169,12 @@ public class AbstractRocksDBSegmentedBytesStore implements Se @Override public KeyValueIterator all() { - final List searchSpace = segments.allSegments(true); + final long actualFrom = getActualFrom(0); + final List searchSpace = keySchema.segmentsToSearch(segments, actualFrom, Long.MAX_VALUE, true); return new SegmentIterator<>( searchSpace.iterator(), - keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, true), + keySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, true), null, null, true); @@ -160,11 +182,13 @@ public class AbstractRocksDBSegmentedBytesStore implements Se @Override public KeyValueIterator backwardAll() { - final List searchSpace = segments.allSegments(false); + final long actualFrom = getActualFrom(0); + + final List searchSpace = keySchema.segmentsToSearch(segments, actualFrom, Long.MAX_VALUE, false); return new SegmentIterator<>( searchSpace.iterator(), - keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, false), + keySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, false), null, null, false); @@ -173,11 +197,18 @@ public class AbstractRocksDBSegmentedBytesStore implements Se @Override public KeyValueIterator fetchAll(final long timeFrom, final long timeTo) { - final List searchSpace = segments.segments(timeFrom, timeTo, true); + final long actualFrom = getActualFrom(timeFrom); + + if (keySchema instanceof WindowKeySchema && timeTo < actualFrom) { + LOG.debug("Returning no records for as timeTo ({}) < actualFrom ({}) ", timeTo, actualFrom); + return KeyValueIterators.emptyIterator(); + } + + final List searchSpace = segments.segments(actualFrom, timeTo, true); return new SegmentIterator<>( searchSpace.iterator(), - keySchema.hasNextCondition(null, null, timeFrom, timeTo, true), + keySchema.hasNextCondition(null, null, actualFrom, timeTo, true), null, null, true); @@ -186,11 +217,18 @@ public class AbstractRocksDBSegmentedBytesStore implements Se @Override public KeyValueIterator backwardFetchAll(final long timeFrom, final long timeTo) { - final List searchSpace = segments.segments(timeFrom, timeTo, false); + final long actualFrom = getActualFrom(timeFrom); + + if (keySchema instanceof WindowKeySchema && timeTo < actualFrom) { + LOG.debug("Returning no records for as timeTo ({}) < actualFrom ({}) ", timeTo, actualFrom); + return KeyValueIterators.emptyIterator(); + } + + final List searchSpace = segments.segments(actualFrom, timeTo, false); return new SegmentIterator<>( searchSpace.iterator(), - keySchema.hasNextCondition(null, null, timeFrom, timeTo, false), + keySchema.hasNextCondition(null, null, actualFrom, timeTo, false), null, null, false); @@ -234,7 +272,14 @@ public class AbstractRocksDBSegmentedBytesStore implements Se @Override public byte[] get(final Bytes key) { - final S segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key)); + final long timestampFromKey = keySchema.segmentTimestamp(key); + // check if timestamp is expired + if (timestampFromKey < observedStreamTime - retentionPeriod + 1) { + LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})", + key.toString(), timestampFromKey, observedStreamTime - retentionPeriod + 1); + return null; + } + final S segment = segments.getSegmentForTimestamp(timestampFromKey); if (segment == null) { return null; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java index 0398f0ca060..f8217c6d066 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java @@ -87,7 +87,7 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst cachedValue = get(baseKey); if (cachedValue == null) { - // Key not in base store, inconsistency happened and remove from index. + // Key not in base store or key is expired, inconsistency happened and remove from index. indexIterator.next(); AbstractRocksDBTimeOrderedSegmentedBytesStore.this.removeIndex(key); } else { @@ -118,7 +118,7 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst final KeySchema baseKeySchema, final Optional indexKeySchema) { super(name, baseKeySchema, indexKeySchema, - new KeyValueSegments(name, metricsScope, retention, segmentInterval)); + new KeyValueSegments(name, metricsScope, retention, segmentInterval), retention); } @Override @@ -141,28 +141,38 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst final long from, final long to, final boolean forward) { - if (indexKeySchema.isPresent()) { - final List searchSpace = indexKeySchema.get().segmentsToSearch(segments, from, to, forward); - final Bytes binaryFrom = indexKeySchema.get().lowerRangeFixedSize(key, from); + final long actualFrom = getActualFrom(from, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema); + + if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && to < actualFrom) { + return KeyValueIterators.emptyIterator(); + } + + if (indexKeySchema.isPresent()) { + final List searchSpace = indexKeySchema.get().segmentsToSearch(segments, actualFrom, to, + forward); + + final Bytes binaryFrom = indexKeySchema.get().lowerRangeFixedSize(key, actualFrom); final Bytes binaryTo = indexKeySchema.get().upperRangeFixedSize(key, to); return getIndexToBaseStoreIterator(new SegmentIterator<>( searchSpace.iterator(), - indexKeySchema.get().hasNextCondition(key, key, from, to, forward), + indexKeySchema.get().hasNextCondition(key, key, actualFrom, to, forward), binaryFrom, binaryTo, forward)); } - final List searchSpace = baseKeySchema.segmentsToSearch(segments, from, to, forward); - final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(key, from); + final List searchSpace = baseKeySchema.segmentsToSearch(segments, actualFrom, to, + forward); + + final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(key, actualFrom); final Bytes binaryTo = baseKeySchema.upperRangeFixedSize(key, to); return new SegmentIterator<>( searchSpace.iterator(), - baseKeySchema.hasNextCondition(key, key, from, to, forward), + baseKeySchema.hasNextCondition(key, key, actualFrom, to, forward), binaryFrom, binaryTo, forward); @@ -197,30 +207,36 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst return KeyValueIterators.emptyIterator(); } + final long actualFrom = getActualFrom(from, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema); + + if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && to < actualFrom) { + return KeyValueIterators.emptyIterator(); + } + if (indexKeySchema.isPresent()) { - final List searchSpace = indexKeySchema.get().segmentsToSearch(segments, from, to, + final List searchSpace = indexKeySchema.get().segmentsToSearch(segments, actualFrom, to, forward); - final Bytes binaryFrom = indexKeySchema.get().lowerRange(keyFrom, from); + final Bytes binaryFrom = indexKeySchema.get().lowerRange(keyFrom, actualFrom); final Bytes binaryTo = indexKeySchema.get().upperRange(keyTo, to); return getIndexToBaseStoreIterator(new SegmentIterator<>( searchSpace.iterator(), - indexKeySchema.get().hasNextCondition(keyFrom, keyTo, from, to, forward), + indexKeySchema.get().hasNextCondition(keyFrom, keyTo, actualFrom, to, forward), binaryFrom, binaryTo, forward)); } - final List searchSpace = baseKeySchema.segmentsToSearch(segments, from, to, + final List searchSpace = baseKeySchema.segmentsToSearch(segments, actualFrom, to, forward); - final Bytes binaryFrom = baseKeySchema.lowerRange(keyFrom, from); + final Bytes binaryFrom = baseKeySchema.lowerRange(keyFrom, actualFrom); final Bytes binaryTo = baseKeySchema.upperRange(keyTo, to); return new SegmentIterator<>( searchSpace.iterator(), - baseKeySchema.hasNextCondition(keyFrom, keyTo, from, to, forward), + baseKeySchema.hasNextCondition(keyFrom, keyTo, actualFrom, to, forward), binaryFrom, binaryTo, forward); @@ -235,13 +251,20 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst @Override public KeyValueIterator fetchAll(final long timeFrom, final long timeTo) { - final List searchSpace = segments.segments(timeFrom, timeTo, true); - final Bytes binaryFrom = baseKeySchema.lowerRange(null, timeFrom); + + final long actualFrom = getActualFrom(timeFrom, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema); + + if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && timeTo < actualFrom) { + return KeyValueIterators.emptyIterator(); + } + + final List searchSpace = segments.segments(actualFrom, timeTo, true); + final Bytes binaryFrom = baseKeySchema.lowerRange(null, actualFrom); final Bytes binaryTo = baseKeySchema.upperRange(null, timeTo); return new SegmentIterator<>( searchSpace.iterator(), - baseKeySchema.hasNextCondition(null, null, timeFrom, timeTo, true), + baseKeySchema.hasNextCondition(null, null, actualFrom, timeTo, true), binaryFrom, binaryTo, true); @@ -250,13 +273,20 @@ public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst @Override public KeyValueIterator backwardFetchAll(final long timeFrom, final long timeTo) { - final List searchSpace = segments.segments(timeFrom, timeTo, false); - final Bytes binaryFrom = baseKeySchema.lowerRange(null, timeFrom); + + final long actualFrom = getActualFrom(timeFrom, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema); + + if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && timeTo < actualFrom) { + return KeyValueIterators.emptyIterator(); + } + + final List searchSpace = segments.segments(actualFrom, timeTo, false); + final Bytes binaryFrom = baseKeySchema.lowerRange(null, actualFrom); final Bytes binaryTo = baseKeySchema.upperRange(null, timeTo); return new SegmentIterator<>( searchSpace.iterator(), - baseKeySchema.hasNextCondition(null, null, timeFrom, timeTo, false), + baseKeySchema.hasNextCondition(null, null, actualFrom, timeTo, false), binaryFrom, binaryTo, false); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java index 6c72fa64c5f..e7b7198d1cf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java @@ -23,6 +23,6 @@ public class RocksDBSegmentedBytesStore extends AbstractRocksDBSegmentedBytesSto final long retention, final long segmentInterval, final KeySchema keySchema) { - super(name, metricsScope, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval)); + super(name, metricsScope, retention, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval)); } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java index 7fd958c2c27..39f493c761b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java @@ -23,6 +23,6 @@ public class RocksDBTimestampedSegmentedBytesStore extends AbstractRocksDBSegmen final long retention, final long segmentInterval, final KeySchema keySchema) { - super(name, metricsScope, keySchema, new TimestampedSegments(name, metricsScope, retention, segmentInterval)); + super(name, metricsScope, retention, keySchema, new TimestampedSegments(name, metricsScope, retention, segmentInterval)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java index 9abc2c9500d..91aa583060c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java @@ -64,6 +64,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.Collections; import org.junit.rules.Timeout; import org.junit.runner.RunWith; @@ -195,22 +196,25 @@ public class TimeWindowedKStreamIntegrationTest { startStreams(); + // on window close + // observedStreamTime : 10, retentionPeriod: 10, actualFrom: 0, timeTo: 0, timeFrom: 0 + // observedStreamTime : 15, retentionPeriod: 10, actualFrom: 5, timeTo: 5, timeFrom: 1 + // observedStreamTime : 25, retentionPeriod: 10, actualFrom: 15, timeTo: 15, timeFrom: 6 + final List, String>> windowedMessages = receiveMessagesWithTimestamp( - new TimeWindowedDeserializer<>(new StringDeserializer(), 10L), - new StringDeserializer(), - 10L, - String.class, - emitFinal ? 6 : 12); + new TimeWindowedDeserializer<>(new StringDeserializer(), 10L), + new StringDeserializer(), + 10L, + String.class, + emitFinal ? 4 : 12); final List, String>> expectResult; if (emitFinal) { expectResult = asList( - new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1+2", 5), - new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+2+3", 10), - new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+4+5", 11), - new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10L, 20L)), "0+3", 10), - new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), "0+5+6", 15), - new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)), "0+6", 15) + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1+2", 5), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+2+3", 10), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+4+5", 11), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)), "0+6", 15) ); } else { expectResult = asList( @@ -260,20 +264,22 @@ public class TimeWindowedKStreamIntegrationTest { startStreams(); + // on window close + // observedStreamTime : 15, retentionPeriod: 15, actualFrom: 0, timeTo: 0, timeFrom: 0 + // observedStreamTime : 25, retentionPeriod: 15, actualFrom: 10, timeTo: 10, timeFrom: 1 + final List, String>> windowedMessages = receiveMessagesWithTimestamp( new TimeWindowedDeserializer<>(new StringDeserializer(), 10L), new StringDeserializer(), 10L, String.class, - emitFinal ? 6 : 13); + emitFinal ? 4 : 13); final List, String>> expectResult; if (emitFinal) { expectResult = asList( new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1+2", 5), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0L, 10L)), "0+4", 6), - new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+2+3", 10), - new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+4+5", 11), new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10L, 20L)), "0+3", 10), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), "0+5+6", 15) ); @@ -342,12 +348,13 @@ public class TimeWindowedKStreamIntegrationTest { startStreams(); + // ON_WINDOW_CLOSE expires all records. List, String>> windowedMessages = receiveMessagesWithTimestamp( new TimeWindowedDeserializer<>(new StringDeserializer(), 10L), new StringDeserializer(), 10L, String.class, - emitFinal ? 5 : 9); + emitFinal ? 4 : 9); List, String>> expectResult; if (emitFinal) { @@ -358,8 +365,6 @@ public class TimeWindowedKStreamIntegrationTest { 5), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+L2,R2", 11), - new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), - "0+L2,R2+L2,R2", 15), new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)), "0+L2,R2", 15) ); @@ -403,22 +408,27 @@ public class TimeWindowedKStreamIntegrationTest { // Restart startStreams(); - windowedMessages = receiveMessagesWithTimestamp( - new TimeWindowedDeserializer<>(new StringDeserializer(), 10L), - new StringDeserializer(), - 10L, - String.class, - 2); - if (emitFinal) { - // Output just new closed window for C - expectResult = asList( - new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(20L, 30L)), - "0+L3,R3", 25), + windowedMessages = receiveMessagesWithTimestamp( + new TimeWindowedDeserializer<>(new StringDeserializer(), 10L), + new StringDeserializer(), + 10L, + String.class, + 1); + + // Output just new/unexpired closed window for C + expectResult = Collections.singletonList( new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(25L, 35L)), "0+L3,R3", 25) ); } else { + windowedMessages = receiveMessagesWithTimestamp( + new TimeWindowedDeserializer<>(new StringDeserializer(), 10L), + new StringDeserializer(), + 10L, + String.class, + 2); + expectResult = asList( new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(30L, 40L)), "0+L3,R3", 35), diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java index 065c63270d3..50b0f8dcdde 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java @@ -1650,22 +1650,7 @@ public class KStreamSlidingWindowAggregateTest { final Map> expected = new HashMap<>(); if (emitFinal) { - expected.put(0L, ValueAndTimestamp.make("ARSTU", 10L)); - expected.put(3L, ValueAndTimestamp.make("ASTU", 10L)); - expected.put(4L, ValueAndTimestamp.make("ATU", 10L)); - expected.put(5L, ValueAndTimestamp.make("ABTU", 15L)); - expected.put(6L, ValueAndTimestamp.make("ABCU", 16L)); - expected.put(8L, ValueAndTimestamp.make("ABCDU", 18L)); - expected.put(9L, ValueAndTimestamp.make("ABCD", 18L)); - expected.put(11L, ValueAndTimestamp.make("BCD", 18L)); - expected.put(16L, ValueAndTimestamp.make("CD", 18L)); - expected.put(17L, ValueAndTimestamp.make("D", 18L)); - expected.put(20L, ValueAndTimestamp.make("E", 30L)); - expected.put(30L, ValueAndTimestamp.make("EF", 40L)); - expected.put(31L, ValueAndTimestamp.make("F", 40L)); - expected.put(45L, ValueAndTimestamp.make("G", 55L)); - expected.put(46L, ValueAndTimestamp.make("GH", 56L)); - expected.put(48L, ValueAndTimestamp.make("GHIJ", 58L)); + // only non-expired records expected.put(52L, ValueAndTimestamp.make("GHIJK", 62L)); expected.put(53L, ValueAndTimestamp.make("GHIJKLMN", 63L)); expected.put(56L, ValueAndTimestamp.make("HIJKLMN", 63L)); @@ -1675,6 +1660,7 @@ public class KStreamSlidingWindowAggregateTest { expected.put(66L, ValueAndTimestamp.make("O", 76L)); expected.put(67L, ValueAndTimestamp.make("OP", 77L)); expected.put(70L, ValueAndTimestamp.make("OPQ", 80L)); + } else { expected.put(0L, ValueAndTimestamp.make("ARSTU", 10L)); expected.put(3L, ValueAndTimestamp.make("ASTU", 10L)); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index 465b66d188f..23b250e503c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -286,11 +286,12 @@ public class KStreamWindowAggregateTest { inputTopic1.pipeInput("A", "1", 20L); processors.get(0).checkAndClearProcessResult( - new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+1", 10), - new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+2+2", 13), - new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)), "0+3", 14), - new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+4", 12) + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+1", 10), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+2+2", 13), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5, 15)), "0+3", 14), + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+4", 12) ); + processors.get(1).checkAndClearProcessResult(); processors.get(2).checkAndClearProcessResult(); @@ -301,18 +302,24 @@ public class KStreamWindowAggregateTest { inputTopic2.pipeInput("A", "a", 15L); processors.get(0).checkAndClearProcessResult(); - processors.get(1).checkAndClearProcessResult( - new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+a", 0), - new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)), "0+b", 1), - new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+c", 2) - ); - processors.get(2).checkAndClearProcessResult( - new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), - "0+1+1%0+a", 9), - new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)), - "0+2%0+b", 1), - new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+3%0+c", - 2)); + + if (withCache) { + processors.get(1).checkAndClearProcessResult( + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+a", 0), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)), "0+b", 1), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+c", 2) + ); + processors.get(2).checkAndClearProcessResult( + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), + "0+1+1%0+a", 9), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)), + "0+2%0+b", 1), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+3%0+c", + 2)); + } else { + processors.get(0).checkAndClearProcessResult(); + processors.get(2).checkAndClearProcessResult(); + } inputTopic2.pipeInput("A", "a", 5L); inputTopic2.pipeInput("B", "b", 6L); @@ -321,11 +328,23 @@ public class KStreamWindowAggregateTest { inputTopic2.pipeInput("A", "a", 21L); processors.get(0).checkAndClearProcessResult(); - processors.get(1).checkAndClearProcessResult( - new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+a", 5), - new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+b", 6), - new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+d+d", 10) - ); + if (withCache) { + processors.get(1).checkAndClearProcessResult( + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+a", 5), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+b", 6), + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+d+d", 10) + ); + } else { + processors.get(1).checkAndClearProcessResult( + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)), "0+a", 0), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)), "0+b", 1), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0, 10)), "0+c", 2), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+a", 5), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+b", 6), + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5, 15)), "0+d+d", 10) + ); + + } processors.get(2).checkAndClearProcessResult( new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5, 15)), "0+1+1%0+a", 10), diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java index 41876581b38..14edfa861b9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java @@ -233,12 +233,21 @@ public class SessionWindowedKStreamImplTest { processData(driver); final SessionStore store = driver.getSessionStore("count-store"); final List, Long>> data = StreamsTestUtils.toList(store.fetch("1", "2")); - assertThat( - data, - equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L), - KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L), - KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), 2L)))); + if (!emitFinal) { + assertThat( + data, + equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L), + KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L), + KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), 2L)))); + } else { + assertThat( + data, + equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L), + KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), 2L)))); + + } } } @@ -251,12 +260,21 @@ public class SessionWindowedKStreamImplTest { final SessionStore sessionStore = driver.getSessionStore("reduced"); final List, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2")); - assertThat( - data, - equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"), - KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"), - KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "1+2")))); + if (!emitFinal) { + assertThat( + data, + equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"), + KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"), + KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "1+2")))); + } else { + assertThat( + data, + equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"), + KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "1+2")))); + + } } } @@ -272,12 +290,21 @@ public class SessionWindowedKStreamImplTest { processData(driver); final SessionStore sessionStore = driver.getSessionStore("aggregated"); final List, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2")); - assertThat( - data, - equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"), - KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"), - KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "0+0+1+2")))); + if (!emitFinal) { + assertThat( + data, + equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"), + KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"), + KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "0+0+1+2")))); + } else { + assertThat( + data, + equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"), + KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "0+0+1+2")))); + + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java index 5ac43ac8082..6aa4d17dca2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java @@ -51,6 +51,7 @@ import org.junit.Test; import java.util.List; import java.util.Properties; +import java.util.Collections; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; @@ -198,6 +199,7 @@ public class TimeWindowedKStreamImplTest { } final ArrayList, String>> processed = supplier.theCapturedProcessor().processed(); + if (emitFinal) { assertEquals( asList( @@ -238,11 +240,26 @@ public class TimeWindowedKStreamImplTest { final List, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L))); - assertThat(data, equalTo(asList( - KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L), - KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L), - KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L), - KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), 1L)))); + if (withCache) { + // with cache returns all records (expired from underneath as well) as part of + // the merge process + assertThat(data, equalTo(asList( + KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L), + KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L), + KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L), + KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), 1L)))); + } else { + // without cache, we get only non-expired record from underlying store. + if (!emitFinal) { + assertThat(data, equalTo(Collections.singletonList( + KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), 1L)))); + } else { + assertThat(data, equalTo(asList( + KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L), + KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L), + KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), 1L)))); + } + } } { final WindowStore> windowStore = @@ -250,11 +267,24 @@ public class TimeWindowedKStreamImplTest { final List, ValueAndTimestamp>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L))); - assertThat(data, equalTo(asList( - KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make(2L, 15L)), - KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make(1L, 500L)), - KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make(2L, 550L)), - KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L))))); + // the same values and logic described above applies here as well. + if (withCache) { + assertThat(data, equalTo(asList( + KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make(2L, 15L)), + KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make(1L, 500L)), + KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make(2L, 550L)), + KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L))))); + } else { + if (!emitFinal) { + assertThat(data, equalTo(Collections.singletonList( + KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L))))); + } else { + assertThat(data, equalTo(asList( + KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make(1L, 500L)), + KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make(2L, 550L)), + KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L))))); + } + } } } } @@ -274,22 +304,37 @@ public class TimeWindowedKStreamImplTest { final List, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L))); - assertThat(data, equalTo(asList( - KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"), - KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"), - KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "10+20"), - KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "30")))); + if (withCache) { + // with cache returns all records (expired from underneath as well) as part of + // the merge process + assertThat(data, equalTo(asList( + KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"), + KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"), + KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "10+20"), + KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "30")))); + } else { + // without cache, we get only non-expired record from underlying store. + // actualFrom = observedStreamTime(1500) - retentionPeriod(1000) + 1 = 501. + // only 1 record is non expired and would be returned. + assertThat(data, equalTo(Collections.singletonList(KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "30")))); + } } { final WindowStore> windowStore = driver.getTimestampedWindowStore("reduced"); final List, ValueAndTimestamp>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L))); - assertThat(data, equalTo(asList( - KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("1+2", 15L)), - KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("3", 500L)), - KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("10+20", 550L)), - KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("30", 1000L))))); + // same logic/data as explained above. + if (withCache) { + assertThat(data, equalTo(asList( + KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("1+2", 15L)), + KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("3", 500L)), + KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("10+20", 550L)), + KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("30", 1000L))))); + } else { + assertThat(data, equalTo(Collections.singletonList( + KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("30", 1000L))))); + } } } } @@ -310,22 +355,36 @@ public class TimeWindowedKStreamImplTest { final List, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L))); - assertThat(data, equalTo(asList( - KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"), - KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"), - KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+10+20"), - KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "0+30")))); + if (withCache) { + // with cache returns all records (expired from underneath as well) as part of + // the merge process + assertThat(data, equalTo(asList( + KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"), + KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"), + KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+10+20"), + KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "0+30")))); + } else { + // without cache, we get only non-expired record from underlying store. + // actualFrom = observedStreamTime(1500) - retentionPeriod(1000) + 1 = 501. + // only 1 record is non expired and would be returned. + assertThat(data, equalTo(Collections + .singletonList(KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), "0+30")))); + } } { final WindowStore> windowStore = driver.getTimestampedWindowStore("aggregated"); final List, ValueAndTimestamp>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L))); - - assertThat(data, equalTo(asList( - KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("0+1+2", 15L)), - KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+3", 500L)), - KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+10+20", 550L)), - KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("0+30", 1000L))))); + if (withCache) { + assertThat(data, equalTo(asList( + KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), ValueAndTimestamp.make("0+1+2", 15L)), + KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+3", 500L)), + KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), ValueAndTimestamp.make("0+10+20", 550L)), + KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("0+30", 1000L))))); + } else { + assertThat(data, equalTo(Collections.singletonList( + KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), ValueAndTimestamp.make("0+30", 1000L))))); + } } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java index 586ff03d62f..c81e57589a0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java @@ -179,11 +179,10 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest values = bytesStore.fetch( Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) { - - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L) - ); + // For all tests, actualFrom is computed using observedStreamTime - retention + 1. + // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001 + // all records expired as actual from is 59001 and to is 1000 + final List, Long>> expected = Collections.emptyList(); assertEquals(expected, toList(values)); } @@ -191,11 +190,8 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest values = bytesStore.fetch( Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) { - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), - KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L) - ); + // all records expired as actual from is 59001 and to is 1000 + final List, Long>> expected = Collections.emptyList(); assertEquals(expected, toList(values)); } @@ -203,20 +199,16 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest values = bytesStore.fetch( null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) { - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), - KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L) - ); - + // all records expired as actual from is 59001 and to is 1000 + final List, Long>> expected = Collections.emptyList(); assertEquals(expected, toList(values)); } try (final KeyValueIterator values = bytesStore.fetch( Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) { - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), + // key B is expired as actual from is 59001 + final List, Long>> expected = Collections.singletonList( KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L) ); @@ -226,10 +218,8 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest values = bytesStore.fetch( null, null, 0, windows[3].start())) { - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), - KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), + // keys A and B expired as actual from is 59001 + final List, Long>> expected = Collections.singletonList( KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L) ); @@ -251,10 +241,10 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest values = bytesStore.backwardFetch( Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) { - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L) - ); + // For all tests, actualFrom is computed using observedStreamTime - retention + 1. + // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001 + // all records expired as actual from is 59001 and to = 1000 + final List, Long>> expected = Collections.emptyList(); assertEquals(expected, toList(values)); } @@ -262,11 +252,8 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest values = bytesStore.backwardFetch( Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) { - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L) - ); + // all records expired as actual from is 59001 and to = 1000 + final List, Long>> expected = Collections.emptyList(); assertEquals(expected, toList(values)); } @@ -274,21 +261,17 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest values = bytesStore.backwardFetch( null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) { - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L) - ); - + // all records expired as actual from is 59001 and to = 1000 + final List, Long>> expected = Collections.emptyList(); assertEquals(expected, toList(values)); } try (final KeyValueIterator values = bytesStore.backwardFetch( Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) { - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L), - KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L) + // only 1 record left as actual from is 59001 and to = 60,000 + final List, Long>> expected = Collections.singletonList( + KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L) ); assertEquals(expected, toList(values)); @@ -297,11 +280,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest values = bytesStore.backwardFetch( null, null, 0, windows[3].start())) { - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L), - KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L) + // only 1 record left as actual from is 59001 and to = 60,000 + final List, Long>> expected = Collections.singletonList( + KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L) ); assertEquals(expected, toList(values)); @@ -854,18 +835,24 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest(keyB, windows[2])), expectedValue3); bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), expectedValue4); + // Record expired as timestampFromRawKey = 1000 while observedStreamTime = 60,000 and retention = 1000. final byte[] value1 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession( key1, windows[0].start(), windows[0].end()); - assertEquals(Bytes.wrap(value1), Bytes.wrap(expectedValue1)); + assertNull(value1); + // Record expired as timestampFromRawKey = 1000 while observedStreamTime = 60,000 and retention = 1000. final byte[] value2 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession( key1, windows[1].start(), windows[1].end()); - assertEquals(Bytes.wrap(value2), Bytes.wrap(expectedValue2)); + assertNull(value2); + // expired record + // timestampFromRawKey = 1500 while observedStreamTime = 60,000 and retention = 1000. final byte[] value3 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession( key2, windows[2].start(), windows[2].end()); - assertEquals(Bytes.wrap(value3), Bytes.wrap(expectedValue3)); + assertNull(value3); + // only non-expired record + // timestampFromRawKey = 60,000 while observedStreamTime = 60,000 and retention = 1000. final byte[] value4 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession( key3, windows[3].start(), windows[3].end()); assertEquals(Bytes.wrap(value4), Bytes.wrap(expectedValue4)); @@ -991,10 +978,26 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest results = bytesStore.fetch( Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 1, 2000)) { - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), - KeyValue.pair(new Windowed<>(keyB, windows[2]), 20L) - ); + final List, Long>> expected; + + // actual from: observedStreamTime - retention + 1 + if (getBaseSchema() instanceof TimeFirstWindowKeySchema) { + // For windowkeyschema, actual from is 1 + // observed stream time = 1000. Retention Period = 1000. + // actual from = (1000 - 1000 + 1) + // and search happens in the range 1-2000 + expected = asList( + KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), + KeyValue.pair(new Windowed<>(keyB, windows[2]), 20L) + ); + } else { + // For session key schema, actual from is 501 + // observed stream time = 1500. Retention Period = 1000. + // actual from = (1500 - 1000 + 1) + // and search happens in the range 501-2000 + expected = Collections.singletonList(KeyValue.pair(new Windowed<>(keyB, windows[2]), 20L)); + } + assertEquals(expected, toList(results)); } @@ -1010,11 +1013,27 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest(key, windows[0])), serializeValue(10)); bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50)); bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100)); + // actual from: observedStreamTime - retention + 1 + // retention = 1000 try (final KeyValueIterator results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1, 999)) { - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>(key, windows[0]), 10L), - KeyValue.pair(new Windowed<>(key, windows[1]), 50L) - ); + + final List, Long>> expected; + + // actual from: observedStreamTime - retention + 1 + if (getBaseSchema() instanceof TimeFirstWindowKeySchema) { + // For windowkeyschema, actual from is 1 + // observed stream time = 1000. actual from = (1000 - 1000 + 1) + // and search happens in the range 1-2000 + expected = asList( + KeyValue.pair(new Windowed<>(key, windows[0]), 10L), + KeyValue.pair(new Windowed<>(key, windows[1]), 50L) + ); + } else { + // For session key schema, actual from is 501 + // observed stream time = 1500. actual from = (1500 - 1000 + 1) + // and search happens in the range 501-2000 deeming first record as expired. + expected = Collections.singletonList(KeyValue.pair(new Windowed<>(key, windows[1]), 50L)); + } assertEquals(expected, toList(results)); } @@ -1054,15 +1073,24 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1500)); + // For all tests, actualFrom is computed using observedStreamTime - retention + 1. + // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001 + // don't return expired records. assertEquals( - asList( - KeyValue.pair(new Windowed<>(key, windows[0]), 50L), - KeyValue.pair(new Windowed<>(key, windows[1]), 100L), - KeyValue.pair(new Windowed<>(key, windows[2]), 500L) - ), + Collections.emptyList(), results ); + final List, Long>> results1 = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 59000, 60000)); + + // only non expired record as actual from is 59001 + assertEquals( + Collections.singletonList( + KeyValue.pair(new Windowed<>(key, windows[3]), 1000L) + ), + results1 + ); + segments.close(); } @@ -1086,9 +1114,11 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest, Long>> results = toList(bytesStore.all()); + // actualFrom is computed using observedStreamTime - retention + 1. + // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001 + // only one record returned as actual from is 59001 assertEquals( - asList( - KeyValue.pair(new Windowed<>(keyA, windows[0]), 50L), + Collections.singletonList( KeyValue.pair(new Windowed<>(keyB, windows[3]), 100L) ), results @@ -1115,12 +1145,13 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest, Long>> results = toList(bytesStore.backwardAll()); assertEquals( - asList( - KeyValue.pair(new Windowed<>(keyB, windows[3]), 100L), - KeyValue.pair(new Windowed<>(keyA, windows[0]), 50L) + Collections.singletonList( + KeyValue.pair(new Windowed<>(keyB, windows[3]), 100L) ), results ); @@ -1147,9 +1178,11 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest, Long>> results = toList(bytesStore.fetchAll(0L, 60_000L)); + // For all tests, actualFrom is computed using observedStreamTime - retention + 1. + // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 = 59001 + // only 1 record fetched as actual from is 59001 assertEquals( - asList( - KeyValue.pair(new Windowed<>(key, windows[0]), 50L), + Collections.singletonList( KeyValue.pair(new Windowed<>(key, windows[3]), 100L) ), results @@ -1277,9 +1310,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest, Long>> expected = new ArrayList<>(); - expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L)); expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L)); + // after restoration, only 1 record should be returned as actual from is 59001 and the prior record is expired. final List, Long>> results = toList(bytesStore.all()); assertEquals(expected, results); } @@ -1332,10 +1365,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest, Long>> expected = new ArrayList<>(); - expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L)); - expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L)); expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L)); + // after restoration, only non expired segments should be returned which is one as actual from is 59001 final List, Long>> results = toList(bytesStore.all()); assertEquals(expected, results); assertThat(bytesStore.getPosition(), Matchers.notNullValue()); @@ -1368,9 +1400,9 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest, Long>> expected = new ArrayList<>(); - expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L)); - expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L)); expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L)); final List, Long>> results = toList(bytesStore.all()); @@ -1407,7 +1439,15 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest, Long>> expected = new ArrayList<>(); - expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L)); + + // actual from = observedStreamTime - retention + 1. + // retention = 1000 + if (getBaseSchema() instanceof TimeFirstWindowKeySchema) { + // For window stores, observedSteam = 1000 => actualFrom = 1 + // For session stores, observedSteam = 1500 => actualFrom = 501 which deems + // the below record as expired. + expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L)); + } final List, Long>> results = toList(bytesStore.all()); assertEquals(expected, results); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java index 49ff61992f8..6e1b3bfcf8c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java @@ -171,44 +171,30 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest try (final KeyValueIterator values = bytesStore.fetch( Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) { - - final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L) - ); - - assertEquals(expected, toList(values)); + // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1) + // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000 + assertEquals(Collections.emptyList(), toList(values)); } try (final KeyValueIterator values = bytesStore.fetch( Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) { - - final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), - KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L) - ); - - assertEquals(expected, toList(values)); + // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1) + // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000 + assertEquals(Collections.emptyList(), toList(values)); } try (final KeyValueIterator values = bytesStore.fetch( null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) { - - final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), - KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L) - ); - - assertEquals(expected, toList(values)); + // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1) + // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000 + assertEquals(Collections.emptyList(), toList(values)); } try (final KeyValueIterator values = bytesStore.fetch( Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) { - - final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), + // Only 1 record not expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1) + // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000 + final List, Long>> expected = Collections.singletonList( KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L) ); @@ -217,11 +203,9 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest try (final KeyValueIterator values = bytesStore.fetch( null, null, 0, windows[3].start())) { - - final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), - KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), + // Only 1 record not expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1) + // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000 + final List, Long>> expected = Collections.singletonList( KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L) ); @@ -242,44 +226,33 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest try (final KeyValueIterator values = bytesStore.backwardFetch( Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) { - final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L) - ); - - assertEquals(expected, toList(values)); + // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1) + // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000 + assertEquals(Collections.emptyList(), toList(values)); } try (final KeyValueIterator values = bytesStore.backwardFetch( Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) { - final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L) - ); - - assertEquals(expected, toList(values)); + // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1) + // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000 + assertEquals(Collections.emptyList(), toList(values)); } try (final KeyValueIterator values = bytesStore.backwardFetch( null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) { - final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L) - ); - - assertEquals(expected, toList(values)); + // All Records expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1) + // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000 + assertEquals(Collections.emptyList(), toList(values)); } try (final KeyValueIterator values = bytesStore.backwardFetch( Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) { - - final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L), - KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L) + // Only 1 record not expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1) + // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000 + final List, Long>> expected = Collections.singletonList( + KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L) ); assertEquals(expected, toList(values)); @@ -287,12 +260,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest try (final KeyValueIterator values = bytesStore.backwardFetch( null, null, 0, windows[3].start())) { - - final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L), - KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), - KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), - KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L) + // Only 1 record not expired as observed stream time = 60000 implying actual-from = 59001 (60000 - 1000 + 1) + // for WindowKeySchema, to = 60000 while for SessionKeySchema, to = 30000 + final List, Long>> expected = Collections.singletonList( + KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L) ); assertEquals(expected, toList(values)); @@ -306,10 +277,18 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50)); bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100)); try (final KeyValueIterator results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1, 999)) { - final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>(key, windows[0]), 10L), - KeyValue.pair(new Windowed<>(key, windows[1]), 50L) - ); + final List, Long>> expected = new ArrayList<>(); + /* + * For WindowKeySchema, the observedStreamTime is 1000 which means 1 extra record gets returned while for + * SessionKeySchema, it's 1500. Which changes the actual-from while fetching. In case of SessionKeySchema, the + * fetch happens from 501-999 while for WindowKeySchema it's from 1-999. + */ + if (schema instanceof SessionKeySchema) { + expected.add(KeyValue.pair(new Windowed<>(key, windows[1]), 50L)); + } else { + expected.add(KeyValue.pair(new Windowed<>(key, windows[0]), 10L)); + expected.add(KeyValue.pair(new Windowed<>(key, windows[1]), 50L)); + } assertEquals(expected, toList(results)); } @@ -341,16 +320,13 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest assertEquals(Utils.mkSet(segments.segmentName(0), segments.segmentName(1)), segmentDirs()); final List, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1500)); - + /* + * All records expired as observed stream time = 60,000 which sets actual-from to 59001(60,000 - 1000 + 1). to = 1500. + */ assertEquals( - Arrays.asList( - KeyValue.pair(new Windowed<>(key, windows[0]), 50L), - KeyValue.pair(new Windowed<>(key, windows[1]), 100L), - KeyValue.pair(new Windowed<>(key, windows[2]), 500L) - ), + Collections.emptyList(), results ); - segments.close(); } @@ -371,11 +347,12 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest ), segmentDirs() ); - + /* + * Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = Long.MAX. + */ final List, Long>> results = toList(bytesStore.all()); assertEquals( - Arrays.asList( - KeyValue.pair(new Windowed<>(key, windows[0]), 50L), + Collections.singletonList( KeyValue.pair(new Windowed<>(key, windows[3]), 100L) ), results @@ -401,11 +378,12 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest ), segmentDirs() ); - + /* + * Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = 60,000. + */ final List, Long>> results = toList(bytesStore.fetchAll(0L, 60_000L)); assertEquals( - Arrays.asList( - KeyValue.pair(new Windowed<>(key, windows[0]), 50L), + Collections.singletonList( KeyValue.pair(new Windowed<>(key, windows[3]), 100L) ), results @@ -529,8 +507,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest // 2 segments are created during restoration. assertEquals(2, bytesStore.getSegments().size()); + /* + * Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = Long.MAX. + */ final List, Long>> expected = new ArrayList<>(); - expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L)); expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L)); final List, Long>> results = toList(bytesStore.all()); @@ -584,9 +564,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest assertEquals(2, bytesStore.getSegments().size()); final String key = "a"; + /* + * Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = Long.MAX. + */ final List, Long>> expected = new ArrayList<>(); - expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L)); - expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L)); expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L)); final List, Long>> results = toList(bytesStore.all()); @@ -621,9 +602,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest assertEquals(2, bytesStore.getSegments().size()); final String key = "a"; + /* + * Only 1 record returned. observed stream time = 60000, actual from = 59001 (60000 - 1000 + 1) and to = Long.MAX. + */ final List, Long>> expected = new ArrayList<>(); - expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L)); - expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L)); expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L)); final List, Long>> results = toList(bytesStore.all()); @@ -659,11 +641,20 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest // 1 segments are created during restoration. assertEquals(1, bytesStore.getSegments().size()); final String key = "a"; - final List, Long>> expected = new ArrayList<>(); - expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L)); + /* + * For WindowKeySchema, the observedStreamTime is 1000 which means 1 extra record gets returned while for + * SessionKeySchema, it's 1500. Which changes the actual-from while fetching. In case of SessionKeySchema, the + * fetch happens from 501 to end while for WindowKeySchema it's from 1 to end. + */ final List, Long>> results = toList(bytesStore.all()); - assertEquals(expected, results); + if (schema instanceof SessionKeySchema) { + assertEquals(Collections.emptyList(), results); + } else { + final List, Long>> expected = new ArrayList<>(); + expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L)); + assertEquals(expected, results); + } assertThat(bytesStore.getPosition(), Matchers.notNullValue()); assertThat(bytesStore.getPosition().getPartitionPositions("A"), hasEntry(0, 2L)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java index efbcd8855a1..56c77ce4fe1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java @@ -45,6 +45,7 @@ import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import java.util.ArrayList; import java.util.Arrays; @@ -58,6 +59,7 @@ import java.util.Properties; import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.utils.Utils.toList; import static org.apache.kafka.test.StreamsTestUtils.valuesToSet; import static org.hamcrest.CoreMatchers.equalTo; @@ -913,7 +915,13 @@ public abstract class AbstractSessionBytesStoreTest { try (final KeyValueIterator, Long> iterator = sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE) ) { - assertEquals(valuesToSet(iterator), new HashSet<>(Arrays.asList(2L, 3L, 4L))); + if (getStoreType() == StoreType.InMemoryStore) { + assertEquals(valuesToSet(iterator), new HashSet<>(Arrays.asList(2L, 3L, 4L))); + } else { + // The 2 records with values 2L and 3L are considered expired as + // their end times < observed stream time - retentionPeriod + 1. + Assertions.assertEquals(valuesToSet(iterator), new HashSet<>(Collections.singletonList(4L))); + } } } @@ -934,4 +942,43 @@ public abstract class AbstractSessionBytesStoreTest { final Position actual = sessionStore.getPosition(); assertThat(expected, is(actual)); } + + @Test + public void shouldNotFetchExpiredSessions() { + final long systemTime = Time.SYSTEM.milliseconds(); + sessionStore.put(new Windowed<>("p", new SessionWindow(systemTime - 3 * RETENTION_PERIOD, systemTime - 2 * RETENTION_PERIOD)), 1L); + sessionStore.put(new Windowed<>("q", new SessionWindow(systemTime - 2 * RETENTION_PERIOD, systemTime - RETENTION_PERIOD)), 4L); + sessionStore.put(new Windowed<>("r", new SessionWindow(systemTime - RETENTION_PERIOD, systemTime - RETENTION_PERIOD / 2)), 3L); + sessionStore.put(new Windowed<>("p", new SessionWindow(systemTime - RETENTION_PERIOD, systemTime - RETENTION_PERIOD / 2)), 2L); + try (final KeyValueIterator, Long> iterator = + sessionStore.findSessions("p", systemTime - 2 * RETENTION_PERIOD, systemTime - RETENTION_PERIOD) + ) { + Assertions.assertEquals(mkSet(2L), valuesToSet(iterator)); + } + try (final KeyValueIterator, Long> iterator = + sessionStore.backwardFindSessions("p", systemTime - 5 * RETENTION_PERIOD, systemTime - 4 * RETENTION_PERIOD) + ) { + Assertions.assertFalse(iterator.hasNext()); + } + try (final KeyValueIterator, Long> iterator = + sessionStore.findSessions("p", "r", systemTime - 5 * RETENTION_PERIOD, systemTime - 4 * RETENTION_PERIOD) + ) { + Assertions.assertFalse(iterator.hasNext()); + } + try (final KeyValueIterator, Long> iterator = + sessionStore.findSessions("p", "r", systemTime - RETENTION_PERIOD, systemTime - RETENTION_PERIOD / 2) + ) { + Assertions.assertEquals(valuesToSet(iterator), mkSet(2L, 3L, 4L)); + } + try (final KeyValueIterator, Long> iterator = + sessionStore.findSessions("p", "r", systemTime - 2 * RETENTION_PERIOD, systemTime - RETENTION_PERIOD) + ) { + Assertions.assertEquals(valuesToSet(iterator), mkSet(2L, 3L, 4L)); + } + try (final KeyValueIterator, Long> iterator = + sessionStore.backwardFindSessions("p", "r", systemTime - 2 * RETENTION_PERIOD, systemTime - RETENTION_PERIOD) + ) { + Assertions.assertEquals(valuesToSet(iterator), mkSet(2L, 3L, 4L)); + } + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java index dbd46111fbb..d29c6bf88d4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java @@ -342,7 +342,7 @@ public abstract class AbstractWindowBytesStoreTest { ); assertEquals( asList(zero, one, two, three), - toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime + 0), ofEpochMilli(defaultStartTime + 3))) + toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime), ofEpochMilli(defaultStartTime + 3))) ); assertEquals( asList(one, two, three, four, five), @@ -360,7 +360,7 @@ public abstract class AbstractWindowBytesStoreTest { ); assertEquals( asList(three, two, one, zero), - toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime + 0), ofEpochMilli(defaultStartTime + 3))) + toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime), ofEpochMilli(defaultStartTime + 3))) ); assertEquals( asList(five, four, three, two, one), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java index ead362a7b39..27fddc9b075 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java @@ -101,7 +101,8 @@ public class CachingPersistentWindowStoreTest { @Before public void setUp() { keySchema = new WindowKeySchema(); - bytesStore = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0, SEGMENT_INTERVAL, keySchema); + ///KAFKA-12960: Adding a retention of 100 ms to make all test cases work as is. + bytesStore = new RocksDBSegmentedBytesStore("test", "metrics-scope", 100, SEGMENT_INTERVAL, keySchema); underlyingStore = new RocksDBWindowStore(bytesStore, false, WINDOW_SIZE); final TimeWindowedDeserializer keyDeserializer = new TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE); keyDeserializer.setIsChangelogTopic(true); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index 922608d4994..044f9484378 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -95,6 +95,7 @@ public class MeteredSessionStoreTest { private static final byte[] VALUE_BYTES = VALUE.getBytes(); private static final long START_TIMESTAMP = 24L; private static final long END_TIMESTAMP = 42L; + private static final int RETENTION_PERIOD = 100; private final String threadId = Thread.currentThread().getName(); private final TaskId taskId = new TaskId(0, 0, "My-Topology"); @@ -429,6 +430,54 @@ public class MeteredSessionStoreTest { verify(innerStore); } + @Test + public void shouldReturnNoSessionsWhenFetchedKeyHasExpired() { + final long systemTime = Time.SYSTEM.milliseconds(); + expect(innerStore.findSessions(KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) + .andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); + init(); + + final KeyValueIterator, String> iterator = store.findSessions(KEY, systemTime - RETENTION_PERIOD, systemTime); + assertFalse(iterator.hasNext()); + iterator.close(); + } + + @Test + public void shouldReturnNoSessionsInBackwardOrderWhenFetchedKeyHasExpired() { + final long systemTime = Time.SYSTEM.milliseconds(); + expect(innerStore.backwardFindSessions(KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) + .andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); + init(); + + final KeyValueIterator, String> iterator = store.backwardFindSessions(KEY, systemTime - RETENTION_PERIOD, systemTime); + assertFalse(iterator.hasNext()); + iterator.close(); + } + + @Test + public void shouldNotFindExpiredSessionRangeFromStore() { + final long systemTime = Time.SYSTEM.milliseconds(); + expect(innerStore.findSessions(KEY_BYTES, KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) + .andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); + init(); + + final KeyValueIterator, String> iterator = store.findSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime); + assertFalse(iterator.hasNext()); + iterator.close(); + } + + @Test + public void shouldNotFindExpiredSessionRangeInBackwardOrderFromStore() { + final long systemTime = Time.SYSTEM.milliseconds(); + expect(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) + .andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); + init(); + + final KeyValueIterator, String> iterator = store.backwardFindSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime); + assertFalse(iterator.hasNext()); + iterator.close(); + } + @Test public void shouldRecordRestoreTimeOnInit() { init(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index ca6a518eb4c..20eb5ec88a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -45,6 +45,7 @@ import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -82,6 +83,7 @@ public class MeteredWindowStoreTest { private static final String VALUE = "value"; private static final byte[] VALUE_BYTES = VALUE.getBytes(); private static final int WINDOW_SIZE_MS = 10; + private static final int RETENTION_PERIOD = 100; private static final long TIMESTAMP = 42L; private final String threadId = Thread.currentThread().getName(); @@ -270,6 +272,18 @@ public class MeteredWindowStoreTest { verify(innerStoreMock); } + @Test + public void shouldReturnNoRecordWhenFetchedKeyHasExpired() { + expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1 + RETENTION_PERIOD)) + .andReturn(KeyValueIterators.emptyWindowStoreIterator()); + replay(innerStoreMock); + + store.init((StateStoreContext) context, store); + store.fetch("a", ofEpochMilli(1), ofEpochMilli(1).plus(RETENTION_PERIOD, ChronoUnit.MILLIS)).close(); // recorded on close; + + verify(innerStoreMock); + } + @Test public void shouldFetchRangeFromInnerStoreAndRecordFetchMetrics() { expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index c0c7e963e6e..d2f5289a1ad 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -187,19 +187,40 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest { ), segmentDirs(baseDir) ); - + // For all tests, for WindowStore actualFrom is computed using observedStreamTime - retention + 1. + // while for TimeOrderedWindowStores, actualFrom = observedStreamTime - retention + // expired record assertEquals( - new HashSet<>(Collections.singletonList("zero")), + new HashSet<>(Collections.emptyList()), valuesToSet(windowStore.fetch( 0, ofEpochMilli(startTime - WINDOW_SIZE), ofEpochMilli(startTime + WINDOW_SIZE)))); - assertEquals( - new HashSet<>(Collections.singletonList("one")), - valuesToSet(windowStore.fetch( - 1, - ofEpochMilli(startTime + increment - WINDOW_SIZE), - ofEpochMilli(startTime + increment + WINDOW_SIZE)))); + // RocksDbWindwStore => + // from = 149997 + // to = 150003 + // actualFrom = 150001 + // record one timestamp is 150,000 So, it's ignored. + // RocksDBTimeOrderedWindowStore*Index => + // from = 149997 + // to = 150003 + // actualFrom = 150000, hence not ignored + if (storeType == StoreType.RocksDBWindowStore) { + assertEquals( + new HashSet<>(Collections.emptyList()), + valuesToSet(windowStore.fetch( + 1, + ofEpochMilli(startTime + increment - WINDOW_SIZE), + ofEpochMilli(startTime + increment + WINDOW_SIZE)))); + + } else { + assertEquals( + new HashSet<>(Collections.singletonList("one")), + valuesToSet(windowStore.fetch( + 1, + ofEpochMilli(startTime + increment - WINDOW_SIZE), + ofEpochMilli(startTime + increment + WINDOW_SIZE)))); + } assertEquals( new HashSet<>(Collections.singletonList("two")), valuesToSet(windowStore.fetch( @@ -247,12 +268,32 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest { 1, ofEpochMilli(startTime + increment - WINDOW_SIZE), ofEpochMilli(startTime + increment + WINDOW_SIZE)))); - assertEquals( - new HashSet<>(Collections.singletonList("two")), - valuesToSet(windowStore.fetch( - 2, - ofEpochMilli(startTime + increment * 2 - WINDOW_SIZE), - ofEpochMilli(startTime + increment * 2 + WINDOW_SIZE)))); + // RocksDbWindwStore => + // from = 179997 + // to = 180003 + // actualFrom = 170001 + // record one timestamp is 180,000 So, it's ignored. + // RocksDBTimeOrderedWindowStore*Index => + // from = 179997 + // to = 180003 + // actualFrom = 180000, hence not ignored + if (storeType == StoreType.RocksDBWindowStore) { + assertEquals( + // expired record + new HashSet<>(Collections.emptyList()), + valuesToSet(windowStore.fetch( + 2, + ofEpochMilli(startTime + increment * 2 - WINDOW_SIZE), + ofEpochMilli(startTime + increment * 2 + WINDOW_SIZE)))); + } else { + assertEquals( + // expired record + new HashSet<>(Collections.singletonList("two")), + valuesToSet(windowStore.fetch( + 2, + ofEpochMilli(startTime + increment * 2 - WINDOW_SIZE), + ofEpochMilli(startTime + increment * 2 + WINDOW_SIZE)))); + } assertEquals( new HashSet<>(Collections.emptyList()), valuesToSet(windowStore.fetch( @@ -301,7 +342,8 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest { ofEpochMilli(startTime + increment - WINDOW_SIZE), ofEpochMilli(startTime + increment + WINDOW_SIZE)))); assertEquals( - new HashSet<>(Collections.singletonList("two")), + // expired record + new HashSet<>(Collections.emptyList()), valuesToSet(windowStore.fetch( 2, ofEpochMilli(startTime + increment * 2 - WINDOW_SIZE), @@ -371,12 +413,24 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest { 3, ofEpochMilli(startTime + increment * 3 - WINDOW_SIZE), ofEpochMilli(startTime + increment * 3 + WINDOW_SIZE)))); - assertEquals( - new HashSet<>(Collections.singletonList("four")), - valuesToSet(windowStore.fetch( - 4, - ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE), - ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE)))); + if (storeType == StoreType.RocksDBWindowStore) { + assertEquals( + // expired record + new HashSet<>(Collections.emptyList()), + valuesToSet(windowStore.fetch( + 4, + ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE), + ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE)))); + } else { + assertEquals( + // expired record + new HashSet<>(Collections.singletonList("four")), + valuesToSet(windowStore.fetch( + 4, + ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE), + ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE)))); + + } assertEquals( new HashSet<>(Collections.singletonList("five")), valuesToSet(windowStore.fetch( @@ -465,7 +519,14 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest { iter.next(); fetchedCount++; } - assertEquals(2, fetchedCount); + // 1 extra record is expired in the case of RocksDBWindowStore as + // actualFrom = observedStreamTime - retentionPeriod + 1. The +1 + // isn't present for RocksDbTimeOrderedStoreWith*Index + if (storeType == StoreType.RocksDBWindowStore) { + assertEquals(1, fetchedCount); + } else { + assertEquals(2, fetchedCount); + } assertEquals( Utils.mkSet(segments.segmentName(1L), segments.segmentName(3L)), @@ -480,8 +541,10 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest { iter.next(); fetchedCount++; } - assertEquals(1, fetchedCount); + // the latest record has a timestamp > 60k. So, the +1 in actualFrom calculation in + // RocksDbWindowStore shouldn't have an implciation and all stores should return the same fetched counts. + assertEquals(1, fetchedCount); assertEquals( Utils.mkSet(segments.segmentName(3L), segments.segmentName(5L)), segmentDirs(baseDir) @@ -564,6 +627,9 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest { Serdes.String()); windowStore.init((StateStoreContext) context, windowStore); + // For all tests, for WindowStore actualFrom is computed using observedStreamTime - retention + 1. + // while for TimeOrderedWindowStores, actualFrom = observedStreamTime - retention + assertEquals( new HashSet<>(Collections.emptyList()), valuesToSet(windowStore.fetch( @@ -650,12 +716,31 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest { 3, ofEpochMilli(startTime + increment * 3 - WINDOW_SIZE), ofEpochMilli(startTime + increment * 3 + WINDOW_SIZE)))); - assertEquals( - new HashSet<>(Collections.singletonList("four")), - valuesToSet(windowStore.fetch( - 4, - ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE), - ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE)))); + // RocksDbWindwStore => + // from = 239,997 + // to = 240,003 + // actualFrom = 240,001 + // record four timestamp is 240,000 So, it's ignored. + // RocksDBTimeOrderedWindowStore*Index => + // from = 239,997 + // to = 240,003 + // actualFrom = 240,000, hence not ignored + if (storeType == StoreType.RocksDBWindowStore) { + assertEquals( + new HashSet<>(Collections.emptyList()), + valuesToSet(windowStore.fetch( + 4, + ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE), + ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE)))); + } else { + assertEquals( + new HashSet<>(Collections.singletonList("four")), + valuesToSet(windowStore.fetch( + 4, + ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE), + ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE)))); + + } assertEquals( new HashSet<>(Collections.singletonList("five")), valuesToSet(windowStore.fetch(