KAFKA-13021: clarify KIP-633 javadocs and address remaining feedback (#11114)

There were a few followup things to address from #10926, most importantly a number of updates to the javadocs. Also includes a few missing verification checks.

Reviewers: Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <mjsax@apache.org>, Israel Ekpo
This commit is contained in:
A. Sophie Blee-Goldman 2021-07-23 16:14:37 -07:00
parent e9416eed2d
commit ace3bbbc73
9 changed files with 126 additions and 95 deletions

View File

@ -135,25 +135,32 @@
We removed the default implementation of <code>RocksDBConfigSetter#close()</code>.
</p>
<p>
We dropped the default 24 hours grace period for windowed operations such as Window or Session aggregates, or stream-stream joins.
This period determines how long after a window ends any out-of-order records will still be processed.
Records coming in after the grace period has elapsed will be dropped from those windows.
With a long grace period, though Kafka Streams can handle out-of-order data up to that amount of time, it will also incur a high and confusing latency for users,
e.g. suppression operators with the default won't emit results up for 24 hours, while in practice out-of-order data usually has a much smaller time-skewness.
Instead of abstracting this config from users with a long default value, we introduced new constructs such as <code>TimeWindows#ofSizeAndGrace</code> to let callers always set it upon constructing the windows;
the other setters such as <code>TimeWindows#grace</code> are deprecated and will be removed in the future.
We dropped the default 24 hours grace period for windowed operations such as Window or Session aggregates, or
stream-stream joins. This period determines how long after a window ends any out-of-order records will still
be processed. Records coming in after the grace period has elapsed are considered late and will be dropped.
But in operators such as suppression, a large grace period has the drawback of incurring an equally large
output latency. The current API made it all too easy to miss the grace period config completely, leading you
to wonder why your application seems to produce no output -- it actually is, but not for 24 hours.
<p>
To prevent accidentally or unknowingly falling back to the default 24hr grace period, we deprecated all of the
existing static constructors for the <code>Windows</code> classes (such as <code>TimeWindows#of</code>). These
are replaced by new static constructors of two flavors: <code>#ofSizeAndGrace</code> and <code>#ofSizeWithNoGrace</code>
(these are for the <code>TimeWindows</code> class; analogous APIs exist for the <code>JoinWindows</code>,
<code>SessionWindows</code>, and SlidingWindows classes). With these new APIs you are forced to set the grace
period explicitly, or else consciously choose to opt out by selecting the <code>WithNoGrace</code> flavor which
sets it to 0 for situations where you really don't care about the grace period, for example during testing or
when playing around with Kafka Streams for the first time. Note that using the new APIs for the
<code>JoinWindows</code> class will also enable a fix for spurious left/outer join results, as described in
the following paragraph. For more details on the grace period and new static constructors, see
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams">KIP-633</a>
</p>
<p>
Additionally, in older versions Kafka Streams emitted stream-stream left/outer join results eagerly. This behavior may lead to spurious left/outer join result records.
In this release, we changed the behavior to avoid spurious results and left/outer join result are only emitted after the join window is closed, i.e., after the grace period elapsed.
To maintain backward compatibility, the old API <code>JoinWindows#of(timeDifference)</code> preserves the old eager-emit behavior and only the new
APIs <code>JoinWindows#ofTimeDifferenceAndGrace()</code> and <code>JoinsWindows#ofTimeDifferenceNoGrace</code> enable the new behavior.
APIs <code>JoinWindows#ofTimeDifferenceAndGrace()</code> and <code>JoinsWindows#ofTimeDifferenceNoGrace</code> enable the new behavior. Check out
<a href="https://issues.apache.org/jira/browse/KAFKA-10847">KAFKA-10847</a> for more information.
</p>
<ul>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams">KIP-633</a>: Drop 24 hour default of grace period in Streams</li>
<li><a href="https://issues.apache.org/jira/browse/KAFKA-10847">KAFKA-10847</a>: Avoid spurious left/outer join results in stream-stream join</li>
</ul>
<p>
The public <code>topicGroupId</code> and <code>partition</code> fields on TaskId have been deprecated and replaced with getters. Please migrate to using the new <code>TaskId.subtopology()</code>
(which replaces <code>topicGroupId</code>) and <code>TaskId.partition()</code> APIs instead. Also, the <code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been deprecated

View File

@ -102,49 +102,61 @@ public class JoinWindows extends Windows<Window> {
/**
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
* the timestamp of the record from the primary stream. Using the method explicitly sets the grace period to
* the duration specified by {@code afterWindowEnd} which means that out of order records arriving
* after the window end will be dropped. The delay is defined as (stream_time - record_timestamp).
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} before or after
* the timestamp of the record from the primary stream.
* <p>
* Using this method explicitly sets the grace period to the duration specified by {@code afterWindowEnd}, which
* means that only out-of-order records arriving more than the grace period after the window end will be dropped.
* The window close, after which any incoming records are considered late and will be rejected, is defined as
* {@code windowEnd + afterWindowEnd}
*
* @param timeDifference join window interval
* @param afterWindowEnd The grace period to admit out-of-order events to a window.
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
* @return A new JoinWindows object with the specified window definition and grace period
* @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
* if the {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
*/
public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) {
return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
final String timeDifferenceMsgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
final long timeDifferenceMs = validateMillisecondDuration(timeDifference, timeDifferenceMsgPrefix);
final String afterWindowEndMsgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
return new JoinWindows(timeDifferenceMs, timeDifferenceMs, afterWindowEndMs, true);
}
/**
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
* the timestamp of the record from the primary stream. Using the method implicitly sets the grace period to zero
* which means that out of order records arriving after the window end will be dropped.
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} before or after
* the timestamp of the record from the primary stream.
* <p>
* CAUTION: Using this method implicitly sets the grace period to zero, which means that any out-of-order
* records arriving after the window ends are considered late and will be dropped.
*
* @param timeDifference join window interval
* @return a new JoinWindows object with the window definition and no grace period. Note that this means out-of-order records arriving after the window end will be dropped
* @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
* @return a new JoinWindows object with the window definition and no grace period. Note that this means out of order records arriving after the window end will be dropped
*/
public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference) {
return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), NO_GRACE_PERIOD, true);
return ofTimeDifferenceAndGrace(timeDifference, Duration.ofMillis(NO_GRACE_PERIOD));
}
/**
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} before or after
* the timestamp of the record from the primary stream.
*
* @param timeDifference
* @param timeDifference join window interval
* @return a new JoinWindows object with the window definition with and grace period (default to 24 hours minus {@code timeDifference})
* @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)}} instead
* @deprecated since 3.0. Use {@link #ofTimeDifferenceWithNoGrace(Duration)}} instead
*/
@Deprecated
public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefix);
return new JoinWindows(timeDifferenceMs, timeDifferenceMs, Math.max(DEPRECATED_OLD_24_HR_GRACE_PERIOD - timeDifferenceMs * 2, 0), false);
return new JoinWindows(timeDifferenceMs, timeDifferenceMs, Math.max(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - timeDifferenceMs * 2, 0), false);
}
/**
@ -203,16 +215,14 @@ public class JoinWindows extends Windows<Window> {
*
* @param afterWindowEnd The grace period to admit out-of-order events to a window.
* @return this updated builder
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
* @deprecated since 3.0. Use {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead
*/
@Deprecated
public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
//TODO KAFKA-13021: disallow calling grace() if it was already set via ofTimeDifferenceAndGrace/WithNoGrace()
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
return new JoinWindows(beforeMs, afterMs, afterWindowEndMs, false);
}

View File

@ -23,7 +23,7 @@ import java.util.Objects;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static org.apache.kafka.streams.kstream.Windows.DEPRECATED_OLD_24_HR_GRACE_PERIOD;
import static org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
import static org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD;
import static java.time.Duration.ofMillis;
@ -91,16 +91,16 @@ public final class SessionWindows {
/**
* Creates a new window specification with the specified inactivity gap.
* Using the method implicitly sets the grace period to zero which
* means that out of order records arriving after the window end will be dropped
*
* <p>
* Note that new events may change the boundaries of session windows, so aggressive
* close times can lead to surprising results in which an out-of-order event is rejected and then
* a subsequent event moves the window boundary forward.
* <p>
* CAUTION: Using this method implicitly sets the grace period to zero, which means that any out-of-order
* records arriving after the window ends are considered late and will be dropped.
*
* @param inactivityGap the gap of inactivity between sessions
* @return a window definition with the window size and no grace period. Note that this means out of order records arriving after the window end will be dropped
* @return a window definition with the window size and no grace period. Note that this means out-of-order records arriving after the window end will be dropped
* @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't be represented as {@code long milliseconds}
*/
public static SessionWindows ofInactivityGapWithNoGrace(final Duration inactivityGap) {
@ -109,23 +109,31 @@ public final class SessionWindows {
/**
* Creates a new window specification with the specified inactivity gap.
* Using the method explicitly sets the grace period to the duration specified by {@code afterWindowEnd} which
* means that out of order records arriving after the window end will be dropped
*
* <p>
* Note that new events may change the boundaries of session windows, so aggressive
* close times can lead to surprising results in which an out-of-order event is rejected and then
* a subsequent event moves the window boundary forward.
* <p>
* Using this method explicitly sets the grace period to the duration specified by {@code afterWindowEnd}, which
* means that only out-of-order records arriving more than the grace period after the window end will be dropped.
* The window close, after which any incoming records are considered late and will be rejected, is defined as
* {@code windowEnd + afterWindowEnd}
*
* @param inactivityGap the gap of inactivity between sessions
* @param afterWindowEnd The grace period to admit out-of-order events to a window.
* @return A SessionWindows object with the specified inactivity gap and grace period
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
* @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't be represented as {@code long milliseconds}
* if the {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
*/
public static SessionWindows ofInactivityGapAndGrace(final Duration inactivityGap, final Duration afterWindowEnd) {
return new SessionWindows(inactivityGap.toMillis(), afterWindowEnd.toMillis());
}
final String inactivityGapMsgPrefix = prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
final long inactivityGapMs = validateMillisecondDuration(inactivityGap, inactivityGapMsgPrefix);
final String afterWindowEndMsgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
return new SessionWindows(inactivityGapMs, afterWindowEndMs);
}
/**
* Create a new window specification with the specified inactivity gap.
@ -133,14 +141,14 @@ public final class SessionWindows {
* @param inactivityGap the gap of inactivity between sessions
* @return a new window specification without specifying a grace period (default to 24 hours minus {@code inactivityGap})
* @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofInactivityGapWithNoGrace(Duration)} instead
* @deprecated since 3.0. Use {@link #ofInactivityGapWithNoGrace(Duration)} instead
*/
@Deprecated
public static SessionWindows with(final Duration inactivityGap) {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
final long inactivityGapMs = validateMillisecondDuration(inactivityGap, msgPrefix);
return new SessionWindows(inactivityGapMs, Math.max(DEPRECATED_OLD_24_HR_GRACE_PERIOD - inactivityGapMs, 0));
return new SessionWindows(inactivityGapMs, Math.max(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - inactivityGapMs, 0));
}
/**
@ -153,11 +161,12 @@ public final class SessionWindows {
*
* @param afterWindowEnd The grace period to admit out-of-order events to a window.
* @return this updated builder
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofInactivityGapAndGrace(Duration, Duration)} instead
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
* @deprecated since 3.0. Use {@link #ofInactivityGapAndGrace(Duration, Duration)} instead
*/
@Deprecated
public SessionWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
//TODO KAFKA-13021: disallow calling grace() if it was already set via ofTimeDifferenceAndGrace/WithNoGrace()
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix);

View File

@ -93,13 +93,16 @@ public final class SlidingWindows {
}
/**
* Return a window definition with the window size
* Using the method implicitly sets the grace period to zero which means that
* out of order records arriving after the window end will be dropped
* Return a window definition with the window size based on the given maximum time difference (inclusive) between
* records in the same window and given window grace period. Reject out-of-order events that arrive after {@code grace}.
* A window is closed when {@code stream-time > window-end + grace-period}.
* <p>
* CAUTION: Using this method implicitly sets the grace period to zero, which means that any out-of-order
* records arriving after the window ends are considered late and will be dropped.
*
* @param timeDifference the max time difference (inclusive) between two records in a window
* @return a new window definition with no grace period. Note that this means out of order records arriving after the window end will be dropped
* @throws IllegalArgumentException if the timeDifference is negative
* @return a new window definition with no grace period. Note that this means out-of-order records arriving after the window end will be dropped
* @throws IllegalArgumentException if the timeDifference is negative or can't be represented as {@code long milliseconds}
*/
public static SlidingWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference) throws IllegalArgumentException {
return ofTimeDifferenceAndGrace(timeDifference, ofMillis(NO_GRACE_PERIOD));
@ -113,12 +116,13 @@ public final class SlidingWindows {
* @param timeDifference the max time difference (inclusive) between two records in a window
* @param afterWindowEnd the grace period to admit out-of-order events to a window
* @return a new window definition with the specified grace period
* @throws IllegalArgumentException if the timeDifference or afterWindowEnd (grace period) is negative
* @throws IllegalArgumentException if the timeDifference or afterWindowEnd (grace period) is negative or can't be represented as {@code long milliseconds}
*/
public static SlidingWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) throws IllegalArgumentException {
final long timeDifferenceMs = timeDifference.toMillis();
final long afterWindowEndMs = afterWindowEnd.toMillis();
final String timeDifferenceMsgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
final long timeDifferenceMs = validateMillisecondDuration(timeDifference, timeDifferenceMsgPrefix);
final String afterWindowEndMsgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
return new SlidingWindows(timeDifferenceMs, afterWindowEndMs);
}
@ -132,7 +136,7 @@ public final class SlidingWindows {
* @param grace the grace period to admit out-of-order events to a window
* @return a new window definition
* @throws IllegalArgumentException if the specified window size is &lt; 0 or grace &lt; 0, or either can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)} or {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead
* @deprecated since 3.0. Use {@link #ofTimeDifferenceWithNoGrace(Duration)} or {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead
*/
@Deprecated
public static SlidingWindows withTimeDifferenceAndGrace(final Duration timeDifference, final Duration grace) throws IllegalArgumentException {

View File

@ -78,6 +78,11 @@ public final class TimeWindows extends Windows<TimeWindow> {
throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero.");
}
if (advanceMs <= 0 || advanceMs > sizeMs) {
throw new IllegalArgumentException(String.format("Window advancement interval should be more than zero " +
"and less than window duration which is %d ms, but given advancement interval is: %d ms", sizeMs, advanceMs));
}
if (graceMs < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
@ -90,11 +95,12 @@ public final class TimeWindows extends Windows<TimeWindow> {
* <p>
* This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
* Tumbling windows are a special case of hopping windows with {@code advance == size}.
* Using the method implicitly sets the grace period to zero which means
* that out of order records arriving after the window end will be dropped
* <p>
* CAUTION: Using this method implicitly sets the grace period to zero, which means that any out-of-order
* records arriving after the window ends are considered late and will be dropped.
*
* @param size The size of the window
* @return a new window definition with default no grace period. Note that this means out of order records arriving after the window end will be dropped
* @return a new window definition with default no grace period. Note that this means out-of-order records arriving after the window end will be dropped
* @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
*/
public static TimeWindows ofSizeWithNoGrace(final Duration size) throws IllegalArgumentException {
@ -108,22 +114,23 @@ public final class TimeWindows extends Windows<TimeWindow> {
* <p>
* This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
* Tumbling windows are a special case of hopping windows with {@code advance == size}.
* Using the method explicitly sets the grace period to the duration specified by {@code afterWindowEnd} which means
* that out of order records arriving after the window end will be dropped.
*
* <p>
* Delay is defined as (stream_time - record_timestamp).
* Using this method explicitly sets the grace period to the duration specified by {@code afterWindowEnd}, which
* means that only out-of-order records arriving more than the grace period after the window end will be dropped.
* The window close, after which any incoming records are considered late and will be rejected, is defined as
* {@code windowEnd + afterWindowEnd}
*
* @param size The size of the window. Must be larger than zero
* @param afterWindowEnd The grace period to admit out-of-order events to a window. Must be non-negative.
* @return a TimeWindows object with the specified size and the specified grace period
* @throws IllegalArgumentException if {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
*/
public static TimeWindows ofSizeAndGrace(final Duration size, final Duration afterWindowEnd)
throws IllegalArgumentException {
public static TimeWindows ofSizeAndGrace(final Duration size, final Duration afterWindowEnd) throws IllegalArgumentException {
final String sizeMsgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
final long sizeMs = validateMillisecondDuration(size, sizeMsgPrefix);
final long sizeMs = size.toMillis();
final long afterWindowEndMs = afterWindowEnd.toMillis();
final String afterWindowEndMsgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
return new TimeWindows(sizeMs, sizeMs, afterWindowEndMs);
}
@ -139,14 +146,14 @@ public final class TimeWindows extends Windows<TimeWindow> {
* @param size The size of the window
* @return a new window definition without specifying the grace period (default to 24 hours minus window {@code size})
* @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofSizeWithNoGrace(Duration)} } instead
* @deprecated since 3.0. Use {@link #ofSizeWithNoGrace(Duration)} } instead
*/
@Deprecated
public static TimeWindows of(final Duration size) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
final long sizeMs = validateMillisecondDuration(size, msgPrefix);
return new TimeWindows(sizeMs, sizeMs, Math.max(DEPRECATED_OLD_24_HR_GRACE_PERIOD - sizeMs, 0));
return new TimeWindows(sizeMs, sizeMs, Math.max(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - sizeMs, 0));
}
/**
@ -163,10 +170,6 @@ public final class TimeWindows extends Windows<TimeWindow> {
public TimeWindows advanceBy(final Duration advance) {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(advance, "advance");
final long advanceMs = validateMillisecondDuration(advance, msgPrefix);
if (advanceMs <= 0 || advanceMs > sizeMs) {
throw new IllegalArgumentException(String.format("Window advancement interval should be more than zero " +
"and less than window duration which is %d ms, but given advancement interval is: %d ms", sizeMs, advanceMs));
}
return new TimeWindows(sizeMs, advanceMs, graceMs);
}
@ -196,15 +199,13 @@ public final class TimeWindows extends Windows<TimeWindow> {
* @param afterWindowEnd The grace period to admit out-of-order events to a window.
* @return this updated builder
* @throws IllegalArgumentException if {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofSizeAndGrace(Duration, Duration)} instead
* @deprecated since 3.0. Use {@link #ofSizeAndGrace(Duration, Duration)} instead
*/
@Deprecated
public TimeWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
//TODO KAFKA-13021: disallow calling grace() if it was already set via ofTimeDifferenceAndGrace/WithNoGrace()
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs);
}

View File

@ -43,7 +43,7 @@ public abstract class Windows<W extends Window> {
* This behavior is now deprecated and additional details are available in the motivation for the KIP
* Check out <a href="https://cwiki.apache.org/confluence/x/Ho2NCg">KIP-633</a> for more details
*/
protected static final long DEPRECATED_OLD_24_HR_GRACE_PERIOD = 24 * 60 * 60 * 1000L;
protected static final long DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD = 24 * 60 * 60 * 1000L;
/**
* This constant is used as the specified grace period where we do not have any grace periods instead of magic constants

View File

@ -24,7 +24,7 @@ import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds;
import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
import static org.apache.kafka.streams.kstream.Windows.DEPRECATED_OLD_24_HR_GRACE_PERIOD;
import static org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
@ -122,10 +122,10 @@ public class JoinWindowsTest {
@Test
public void oldAPIShouldSetDefaultGracePeriod() {
assertEquals(Duration.ofDays(1).toMillis(), DEPRECATED_OLD_24_HR_GRACE_PERIOD);
assertEquals(DEPRECATED_OLD_24_HR_GRACE_PERIOD - 6L, JoinWindows.of(ofMillis(3L)).gracePeriodMs());
assertEquals(0L, JoinWindows.of(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD)).gracePeriodMs());
assertEquals(0L, JoinWindows.of(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD + 1L)).gracePeriodMs());
assertEquals(Duration.ofDays(1).toMillis(), DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD);
assertEquals(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - 6L, JoinWindows.of(ofMillis(3L)).gracePeriodMs());
assertEquals(0L, JoinWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD)).gracePeriodMs());
assertEquals(0L, JoinWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD + 1L)).gracePeriodMs());
}
@Test

View File

@ -23,7 +23,7 @@ import java.time.Duration;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
import static org.apache.kafka.streams.kstream.Windows.DEPRECATED_OLD_24_HR_GRACE_PERIOD;
import static org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
@ -61,10 +61,10 @@ public class SessionWindowsTest {
@Test
public void oldAPIShouldSetDefaultGracePeriod() {
assertEquals(Duration.ofDays(1).toMillis(), DEPRECATED_OLD_24_HR_GRACE_PERIOD);
assertEquals(DEPRECATED_OLD_24_HR_GRACE_PERIOD - 3L, SessionWindows.with(ofMillis(3L)).gracePeriodMs());
assertEquals(0L, SessionWindows.with(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD)).gracePeriodMs());
assertEquals(0L, SessionWindows.with(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD + 1L)).gracePeriodMs());
assertEquals(Duration.ofDays(1).toMillis(), DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD);
assertEquals(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - 3L, SessionWindows.with(ofMillis(3L)).gracePeriodMs());
assertEquals(0L, SessionWindows.with(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD)).gracePeriodMs());
assertEquals(0L, SessionWindows.with(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD + 1L)).gracePeriodMs());
}
@Test

View File

@ -25,7 +25,7 @@ import java.util.Map;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
import static org.apache.kafka.streams.kstream.Windows.DEPRECATED_OLD_24_HR_GRACE_PERIOD;
import static org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;
@ -107,10 +107,10 @@ public class TimeWindowsTest {
@Test
public void oldAPIShouldSetDefaultGracePeriod() {
assertEquals(Duration.ofDays(1).toMillis(), DEPRECATED_OLD_24_HR_GRACE_PERIOD);
assertEquals(DEPRECATED_OLD_24_HR_GRACE_PERIOD - 3L, TimeWindows.of(ofMillis(3L)).gracePeriodMs());
assertEquals(0L, TimeWindows.of(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD)).gracePeriodMs());
assertEquals(0L, TimeWindows.of(ofMillis(DEPRECATED_OLD_24_HR_GRACE_PERIOD + 1L)).gracePeriodMs());
assertEquals(Duration.ofDays(1).toMillis(), DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD);
assertEquals(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - 3L, TimeWindows.of(ofMillis(3L)).gracePeriodMs());
assertEquals(0L, TimeWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD)).gracePeriodMs());
assertEquals(0L, TimeWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD + 1L)).gracePeriodMs());
}
@Test