KAFKA-9524: increase retention time for window and grace periods longer than one day (#10091)

Reviewers: Victoria Xia <victoria.xia@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Marco Aurelio Lotz 2021-02-19 03:18:53 +01:00 committed by GitHub
parent e29f7a36db
commit c8112b5ecd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 4 deletions

View File

@ -57,6 +57,8 @@ import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAU
*/ */
public final class TimeWindows extends Windows<TimeWindow> { public final class TimeWindows extends Windows<TimeWindow> {
private static final long EMPTY_GRACE_PERIOD = -1;
private final long maintainDurationMs; private final long maintainDurationMs;
/** The size of the windows in milliseconds. */ /** The size of the windows in milliseconds. */
@ -111,7 +113,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero."); throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero.");
} }
// This is a static factory method, so we initialize grace and retention to the defaults. // This is a static factory method, so we initialize grace and retention to the defaults.
return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS); return new TimeWindows(sizeMs, sizeMs, EMPTY_GRACE_PERIOD, DEFAULT_RETENTION_MS);
} }
/** /**
@ -214,7 +216,10 @@ public final class TimeWindows extends Windows<TimeWindow> {
// NOTE: in the future, when we remove maintainMs, // NOTE: in the future, when we remove maintainMs,
// we should default the grace period to 24h to maintain the default behavior, // we should default the grace period to 24h to maintain the default behavior,
// or we can default to (24h - size) if you want to be super accurate. // or we can default to (24h - size) if you want to be super accurate.
return graceMs != -1 ? graceMs : maintainMs() - size(); if (graceMs != EMPTY_GRACE_PERIOD) {
return graceMs;
}
return Math.max(maintainDurationMs - sizeMs, 0);
} }
/** /**
@ -245,7 +250,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
@Override @Override
@Deprecated @Deprecated
public long maintainMs() { public long maintainMs() {
return Math.max(maintainDurationMs, sizeMs); return Math.max(maintainDurationMs, sizeMs + gracePeriodMs());
} }
@SuppressWarnings("deprecation") // removing segments from Windows will fix this @SuppressWarnings("deprecation") // removing segments from Windows will fix this

View File

@ -19,8 +19,10 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.junit.Test; import org.junit.Test;
import java.time.Duration;
import java.util.Map; import java.util.Map;
import static java.time.Duration.ofDays;
import static java.time.Duration.ofMillis; import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.EqualityCheck.verifyEquality; import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
import static org.apache.kafka.streams.EqualityCheck.verifyInEquality; import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
@ -53,11 +55,18 @@ public class TimeWindowsTest {
@SuppressWarnings("deprecation") // specifically testing deprecated APIs @SuppressWarnings("deprecation") // specifically testing deprecated APIs
@Test @Test
public void shouldUseWindowSizeAsRentitionTimeIfWindowSizeIsLargerThanDefaultRetentionTime() { public void shouldUseWindowSizeAsRetentionTimeIfWindowSizeIsLargerThanDefaultRetentionTime() {
final long windowSize = 2 * TimeWindows.of(ofMillis(1)).maintainMs(); final long windowSize = 2 * TimeWindows.of(ofMillis(1)).maintainMs();
assertEquals(windowSize, TimeWindows.of(ofMillis(windowSize)).maintainMs()); assertEquals(windowSize, TimeWindows.of(ofMillis(windowSize)).maintainMs());
} }
@Test
public void shouldUseWindowSizeAndGraceAsRetentionTimeIfBothCombinedAreLargerThanDefaultRetentionTime() {
final Duration windowsSize = ofDays(1).minus(ofMillis(1));
final Duration gracePeriod = ofMillis(2);
assertEquals(windowsSize.toMillis() + gracePeriod.toMillis(), TimeWindows.of(windowsSize).grace(gracePeriod).maintainMs());
}
@Test @Test
public void windowSizeMustNotBeZero() { public void windowSizeMustNotBeZero() {
assertThrows(IllegalArgumentException.class, () -> TimeWindows.of(ofMillis(0))); assertThrows(IllegalArgumentException.class, () -> TimeWindows.of(ofMillis(0)));