KAFKA-7106: remove deprecated Windows APIs (#10378)

1. Remove all deprecated APIs in KIP-328.
2. Remove deprecated APIs in Windows in KIP-358.

Reviewers: John Roesler <vvcephei@apache.org>
This commit is contained in:
Guozhang Wang 2021-03-28 12:33:40 -07:00 committed by GitHub
parent b8058829bb
commit d5fd491bf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 90 additions and 683 deletions

View File

@ -100,7 +100,8 @@
<li> <code>StreamsBuilder#addGlobalStore</code> (one overload): deprecated in Kafka 1.1.0 (<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=74689212">KIP-233</a>).</li>
<li> <code>ProcessorContext#forward</code> (some overloads): deprecated in Kafka 2.0.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API">KIP-251</a>).</li>
<li> <code>WindowBytesStoreSupplier#segments</code>: deprecated in Kafka 2.1.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-319%3A+Replace+segments+with+segmentInterval+in+WindowBytesStoreSupplier">KIP-319</a>).</li>
<li> Overloaded <code>KafkaStreams#close</code> with <code>long</code> typed parameters: deprecated in Kafka 2.1.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a>).</li>
<li> <code>segments, until, maintainMs</code> on <code>TimeWindows</code>, <code>JoinWindows</code>, and <code>SessionWindows</code>: deprecated in Kafka 2.1.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables">KIP-328</a>).</li>
<li> Overloaded <code>JoinWindows#of, before, after</code>, <code>SessionWindows#with</code>, <code>TimeWindows#of, advanceBy</code>, <code>UnlimitedWindows#startOn</code> and <code>KafkaStreams#close</code> with <code>long</code> typed parameters: deprecated in Kafka 2.1.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a>).</li>
<li> Overloaded <code>KafkaStreams#metadataForKey</code>: deprecated in Kafka 2.5.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance">KIP-535</a>).</li>
<li> Overloaded <code>KafkaStreams#store</code>: deprecated in Kafka 2.5.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance">KIP-562</a>).</li>
</ul>

View File

@ -24,7 +24,6 @@ import java.util.Objects;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
/**
* The window specifications used for joins.
@ -70,8 +69,6 @@ import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAU
*/
public final class JoinWindows extends Windows<Window> {
private final long maintainDurationMs;
/** Maximum time difference for tuples that are before the join tuple. */
public final long beforeMs;
/** Maximum time difference for tuples that are after the join tuple. */
@ -81,46 +78,13 @@ public final class JoinWindows extends Windows<Window> {
private JoinWindows(final long beforeMs,
final long afterMs,
final long graceMs,
final long maintainDurationMs) {
final long graceMs) {
if (beforeMs + afterMs < 0) {
throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
}
this.afterMs = afterMs;
this.beforeMs = beforeMs;
this.graceMs = graceMs;
this.maintainDurationMs = maintainDurationMs;
}
@Deprecated // removing segments from Windows will fix this
private JoinWindows(final long beforeMs,
final long afterMs,
final long graceMs,
final long maintainDurationMs,
final int segments) {
super(segments);
if (beforeMs + afterMs < 0) {
throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
}
this.afterMs = afterMs;
this.beforeMs = beforeMs;
this.graceMs = graceMs;
this.maintainDurationMs = maintainDurationMs;
}
/**
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifferenceMs},
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifferenceMs} earlier or later than
* the timestamp of the record from the primary stream.
*
* @param timeDifferenceMs join window interval in milliseconds
* @throws IllegalArgumentException if {@code timeDifferenceMs} is negative
* @deprecated Use {@link #of(Duration)} instead.
*/
@Deprecated
public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException {
// This is a static factory method, so we initialize grace and retention to the defaults.
return new JoinWindows(timeDifferenceMs, timeDifferenceMs, -1L, DEFAULT_RETENTION_MS);
}
/**
@ -133,23 +97,8 @@ public final class JoinWindows extends Windows<Window> {
*/
public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
return of(validateMillisecondDuration(timeDifference, msgPrefix));
}
/**
* Changes the start window boundary to {@code timeDifferenceMs} but keep the end window boundary as is.
* Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
* {@code timeDifferenceMs} earlier than the timestamp of the record from the primary stream.
* {@code timeDifferenceMs} can be negative but its absolute value must not be larger than current window "after"
* value (which would result in a negative window size).
*
* @param timeDifferenceMs relative window start time in milliseconds
* @throws IllegalArgumentException if the resulting window size is negative
* @deprecated Use {@link #before(Duration)} instead.
*/
@Deprecated
public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException {
return new JoinWindows(timeDifferenceMs, afterMs, graceMs, maintainDurationMs, segments);
final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefix);
return new JoinWindows(timeDifferenceMs, timeDifferenceMs, DEFAULT_GRACE_PERIOD_MS);
}
/**
@ -164,23 +113,8 @@ public final class JoinWindows extends Windows<Window> {
*/
public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
return before(validateMillisecondDuration(timeDifference, msgPrefix));
}
/**
* Changes the end window boundary to {@code timeDifferenceMs} but keep the start window boundary as is.
* Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
* {@code timeDifferenceMs} later than the timestamp of the record from the primary stream.
* {@code timeDifferenceMs} can be negative but its absolute value must not be larger than current window "before"
* value (which would result in a negative window size).
*
* @param timeDifferenceMs relative window end time in milliseconds
* @throws IllegalArgumentException if the resulting window size is negative
* @deprecated Use {@link #after(Duration)} instead
*/
@Deprecated
public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException {
return new JoinWindows(beforeMs, timeDifferenceMs, graceMs, maintainDurationMs, segments);
final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefix);
return new JoinWindows(timeDifferenceMs, afterMs, DEFAULT_GRACE_PERIOD_MS);
}
/**
@ -195,7 +129,8 @@ public final class JoinWindows extends Windows<Window> {
*/
public JoinWindows after(final Duration timeDifference) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
return after(validateMillisecondDuration(timeDifference, msgPrefix));
final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefix);
return new JoinWindows(beforeMs, timeDifferenceMs, DEFAULT_GRACE_PERIOD_MS);
}
/**
@ -224,55 +159,20 @@ public final class JoinWindows extends Windows<Window> {
* @return this updated builder
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
*/
@SuppressWarnings("deprecation") // removing segments from Windows will fix this
public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
return new JoinWindows(beforeMs, afterMs, afterWindowEndMs, maintainDurationMs, segments);
return new JoinWindows(beforeMs, afterMs, afterWindowEndMs);
}
@Override
public long gracePeriodMs() {
// NOTE: in the future, when we remove maintainMs,
// we should default the grace period to 24h to maintain the default behavior,
// or we can default to (24h - size) if you want to be super accurate.
return graceMs != -1 ? graceMs : maintainMs() - size();
return graceMs;
}
/**
* @param durationMs the window retention time in milliseconds
* @return itself
* @throws IllegalArgumentException if {@code durationMs} is smaller than the window size
* @deprecated since 2.1. Use {@link JoinWindows#grace(Duration)} instead.
*/
@Override
@Deprecated
public JoinWindows until(final long durationMs) throws IllegalArgumentException {
if (durationMs < size()) {
throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than the window size.");
}
return new JoinWindows(beforeMs, afterMs, graceMs, durationMs, segments);
}
/**
* {@inheritDoc}
* <p>
* For {@link TimeWindows} the maintain duration is at least as small as the window size.
*
* @return the window maintain duration
* @deprecated since 2.1. This function should not be used anymore, since {@link JoinWindows#until(long)}
* is deprecated in favor of {@link JoinWindows#grace(Duration)}.
*/
@Override
@Deprecated
public long maintainMs() {
return Math.max(maintainDurationMs, size());
}
@SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public boolean equals(final Object o) {
if (this == o) {
@ -284,26 +184,20 @@ public final class JoinWindows extends Windows<Window> {
final JoinWindows that = (JoinWindows) o;
return beforeMs == that.beforeMs &&
afterMs == that.afterMs &&
maintainDurationMs == that.maintainDurationMs &&
segments == that.segments &&
graceMs == that.graceMs;
}
@SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public int hashCode() {
return Objects.hash(beforeMs, afterMs, graceMs, maintainDurationMs, segments);
return Objects.hash(beforeMs, afterMs, graceMs);
}
@SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public String toString() {
return "JoinWindows{" +
"beforeMs=" + beforeMs +
", afterMs=" + afterMs +
", graceMs=" + graceMs +
", maintainDurationMs=" + maintainDurationMs +
", segments=" + segments +
'}';
}
}

View File

@ -242,7 +242,8 @@ public class Materialized<K, V, S extends StateStore> {
* ({@link Materialized#as(SessionBytesStoreSupplier)} or {@link Materialized#as(WindowBytesStoreSupplier)}).
*
* Note that the retention period must be at least long enough to contain the windowed data's entire life cycle,
* from window-start through window-end, and for the entire grace period.
* from window-start through window-end, and for the entire grace period. If not specified, the retention
* period would be set as the window length (from window-start through window-end) plus the grace period.
*
* @param retention the retention time
* @return itself

View File

@ -17,14 +17,13 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import java.time.Duration;
import java.util.Objects;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
import static org.apache.kafka.streams.kstream.Windows.DEFAULT_GRACE_PERIOD_MS;
/**
@ -73,33 +72,14 @@ import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAU
public final class SessionWindows {
private final long gapMs;
private final long maintainDurationMs;
private final long graceMs;
private SessionWindows(final long gapMs, final long maintainDurationMs, final long graceMs) {
private SessionWindows(final long gapMs, final long graceMs) {
this.gapMs = gapMs;
this.maintainDurationMs = maintainDurationMs;
this.graceMs = graceMs;
}
/**
* Create a new window specification with the specified inactivity gap in milliseconds.
*
* @param inactivityGapMs the gap of inactivity between sessions in milliseconds
* @return a new window specification with default maintain duration of 1 day
*
* @throws IllegalArgumentException if {@code inactivityGapMs} is zero or negative
* @deprecated Use {@link #with(Duration)} instead.
*/
@Deprecated
public static SessionWindows with(final long inactivityGapMs) {
if (inactivityGapMs <= 0) {
throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative.");
}
return new SessionWindows(inactivityGapMs, DEFAULT_RETENTION_MS, -1);
}
/**
* Create a new window specification with the specified inactivity gap.
*
@ -110,27 +90,11 @@ public final class SessionWindows {
*/
public static SessionWindows with(final Duration inactivityGap) {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
return with(validateMillisecondDuration(inactivityGap, msgPrefix));
}
/**
* Set the window maintain duration (retention time) in milliseconds.
* This retention time is a guaranteed <i>lower bound</i> for how long a window will be maintained.
*
* @return itself
* @throws IllegalArgumentException if {@code durationMs} is smaller than window gap
*
* @deprecated since 2.1. Use {@link Materialized#retention}
* or directly configure the retention in a store supplier and use
* {@link Materialized#as(SessionBytesStoreSupplier)}.
*/
@Deprecated
public SessionWindows until(final long durationMs) throws IllegalArgumentException {
if (durationMs < gapMs) {
throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than window gap.");
final long inactivityGapMs = validateMillisecondDuration(inactivityGap, msgPrefix);
if (inactivityGapMs <= 0) {
throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative.");
}
return new SessionWindows(gapMs, durationMs, graceMs);
return new SessionWindows(inactivityGapMs, DEFAULT_GRACE_PERIOD_MS);
}
/**
@ -148,24 +112,15 @@ public final class SessionWindows {
public SessionWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
return new SessionWindows(
gapMs,
maintainDurationMs,
afterWindowEndMs
);
return new SessionWindows(gapMs, afterWindowEndMs);
}
@SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
public long gracePeriodMs() {
// NOTE: in the future, when we remove maintainMs,
// we should default the grace period to 24h to maintain the default behavior,
// or we can default to (24h - gapMs) if you want to be super accurate.
return graceMs != -1 ? graceMs : maintainMs() - inactivityGap();
return graceMs;
}
/**
@ -177,20 +132,6 @@ public final class SessionWindows {
return gapMs;
}
/**
* Return the window maintain duration (retention time) in milliseconds.
* <p>
* For {@code SessionWindows} the maintain duration is at least as small as the window gap.
*
* @return the window maintain duration
* @deprecated since 2.1. Use {@link Materialized#retention} instead.
*/
@Deprecated
public long maintainMs() {
return Math.max(maintainDurationMs, gapMs);
}
@Override
public boolean equals(final Object o) {
if (this == o) {
@ -201,20 +142,18 @@ public final class SessionWindows {
}
final SessionWindows that = (SessionWindows) o;
return gapMs == that.gapMs &&
maintainDurationMs == that.maintainDurationMs &&
graceMs == that.graceMs;
}
@Override
public int hashCode() {
return Objects.hash(gapMs, maintainDurationMs, graceMs);
return Objects.hash(gapMs, graceMs);
}
@Override
public String toString() {
return "SessionWindows{" +
"gapMs=" + gapMs +
", maintainDurationMs=" + maintainDurationMs +
", graceMs=" + graceMs +
'}';
}

View File

@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import java.time.Duration;
import java.util.LinkedHashMap;
@ -27,7 +26,6 @@ import java.util.Objects;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
/**
* The fixed-size time-based window specifications used for aggregations.
@ -57,10 +55,6 @@ import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAU
*/
public final class TimeWindows extends Windows<TimeWindow> {
private static final long EMPTY_GRACE_PERIOD = -1;
private final long maintainDurationMs;
/** The size of the windows in milliseconds. */
@SuppressWarnings("WeakerAccess")
public final long sizeMs;
@ -71,49 +65,13 @@ public final class TimeWindows extends Windows<TimeWindow> {
*/
@SuppressWarnings("WeakerAccess")
public final long advanceMs;
private final long graceMs;
private TimeWindows(final long sizeMs, final long advanceMs, final long graceMs, final long maintainDurationMs) {
private TimeWindows(final long sizeMs, final long advanceMs, final long graceMs) {
this.sizeMs = sizeMs;
this.advanceMs = advanceMs;
this.graceMs = graceMs;
this.maintainDurationMs = maintainDurationMs;
}
/** Private constructor for preserving segments. Can be removed along with Windows.segments. **/
@Deprecated
private TimeWindows(final long sizeMs,
final long advanceMs,
final long graceMs,
final long maintainDurationMs,
final int segments) {
super(segments);
this.sizeMs = sizeMs;
this.advanceMs = advanceMs;
this.graceMs = graceMs;
this.maintainDurationMs = maintainDurationMs;
}
/**
* Return a window definition with the given window size, and with the advance interval being equal to the window
* size.
* The time interval represented by the N-th window is: {@code [N * size, N * size + size)}.
* <p>
* This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
* Tumbling windows are a special case of hopping windows with {@code advance == size}.
*
* @param sizeMs The size of the window in milliseconds
* @return a new window definition with default maintain duration of 1 day
* @throws IllegalArgumentException if the specified window size is zero or negative
* @deprecated Use {@link #of(Duration)} instead
*/
@Deprecated
public static TimeWindows of(final long sizeMs) throws IllegalArgumentException {
if (sizeMs <= 0) {
throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero.");
}
// This is a static factory method, so we initialize grace and retention to the defaults.
return new TimeWindows(sizeMs, sizeMs, EMPTY_GRACE_PERIOD, DEFAULT_RETENTION_MS);
}
/**
@ -128,31 +86,13 @@ public final class TimeWindows extends Windows<TimeWindow> {
* @return a new window definition with default maintain duration of 1 day
* @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
*/
@SuppressWarnings("deprecation") // removing #of(final long sizeMs) will fix this
public static TimeWindows of(final Duration size) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
return of(validateMillisecondDuration(size, msgPrefix));
}
/**
* Return a window definition with the original size, but advance ("hop") the window by the given interval, which
* specifies by how much a window moves forward relative to the previous one.
* The time interval represented by the N-th window is: {@code [N * advance, N * advance + size)}.
* <p>
* This provides the semantics of hopping windows, which are fixed-sized, overlapping windows.
*
* @param advanceMs The advance interval ("hop") in milliseconds of the window, with the requirement that {@code 0 < advanceMs <= sizeMs}.
* @return a new window definition with default maintain duration of 1 day
* @throws IllegalArgumentException if the advance interval is negative, zero, or larger than the window size
* @deprecated Use {@link #advanceBy(Duration)} instead
*/
@Deprecated
public TimeWindows advanceBy(final long advanceMs) {
if (advanceMs <= 0 || advanceMs > sizeMs) {
throw new IllegalArgumentException(String.format("Window advancement interval should be more than zero " +
"and less than window duration which is %d ms, but given advancement interval is: %d ms", sizeMs, advanceMs));
final long sizeMs = validateMillisecondDuration(size, msgPrefix);
if (sizeMs <= 0) {
throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero.");
}
return new TimeWindows(sizeMs, advanceMs, graceMs, maintainDurationMs, segments);
return new TimeWindows(sizeMs, sizeMs, DEFAULT_GRACE_PERIOD_MS);
}
/**
@ -166,10 +106,14 @@ public final class TimeWindows extends Windows<TimeWindow> {
* @return a new window definition with default maintain duration of 1 day
* @throws IllegalArgumentException if the advance interval is negative, zero, or larger than the window size
*/
@SuppressWarnings("deprecation") // removing #advanceBy(final long advanceMs) will fix this
public TimeWindows advanceBy(final Duration advance) {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(advance, "advance");
return advanceBy(validateMillisecondDuration(advance, msgPrefix));
final long advanceMs = validateMillisecondDuration(advance, msgPrefix);
if (advanceMs <= 0 || advanceMs > sizeMs) {
throw new IllegalArgumentException(String.format("Window advancement interval should be more than zero " +
"and less than window duration which is %d ms, but given advancement interval is: %d ms", sizeMs, advanceMs));
}
return new TimeWindows(sizeMs, advanceMs, graceMs);
}
@Override
@ -199,7 +143,6 @@ public final class TimeWindows extends Windows<TimeWindow> {
* @return this updated builder
* @throws IllegalArgumentException if {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
*/
@SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows
public TimeWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix);
@ -207,53 +150,14 @@ public final class TimeWindows extends Windows<TimeWindow> {
throw new IllegalArgumentException("Grace period must not be negative.");
}
return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, maintainDurationMs, segments);
return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs);
}
@SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
@Override
public long gracePeriodMs() {
// NOTE: in the future, when we remove maintainMs,
// we should default the grace period to 24h to maintain the default behavior,
// or we can default to (24h - size) if you want to be super accurate.
if (graceMs != EMPTY_GRACE_PERIOD) {
return graceMs;
}
return Math.max(maintainDurationMs - sizeMs, 0);
return graceMs;
}
/**
* @param durationMs the window retention time
* @return itself
* @throws IllegalArgumentException if {@code duration} is smaller than the window size
*
* @deprecated since 2.1. Use {@link Materialized#retention} or directly configure the retention in a store supplier
* and use {@link Materialized#as(WindowBytesStoreSupplier)}.
*/
@Override
@Deprecated
public TimeWindows until(final long durationMs) throws IllegalArgumentException {
if (durationMs < sizeMs) {
throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than the window size.");
}
return new TimeWindows(sizeMs, advanceMs, graceMs, durationMs, segments);
}
/**
* {@inheritDoc}
* <p>
* For {@code TimeWindows} the maintain duration is at least as small as the window size.
*
* @return the window maintain duration
* @deprecated since 2.1. Use {@link Materialized#retention} instead.
*/
@Override
@Deprecated
public long maintainMs() {
return Math.max(maintainDurationMs, sizeMs + gracePeriodMs());
}
@SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public boolean equals(final Object o) {
if (this == o) {
@ -263,28 +167,22 @@ public final class TimeWindows extends Windows<TimeWindow> {
return false;
}
final TimeWindows that = (TimeWindows) o;
return maintainDurationMs == that.maintainDurationMs &&
segments == that.segments &&
sizeMs == that.sizeMs &&
return sizeMs == that.sizeMs &&
advanceMs == that.advanceMs &&
graceMs == that.graceMs;
}
@SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public int hashCode() {
return Objects.hash(maintainDurationMs, segments, sizeMs, advanceMs, graceMs);
return Objects.hash(sizeMs, advanceMs, graceMs);
}
@SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public String toString() {
return "TimeWindows{" +
"maintainDurationMs=" + maintainDurationMs +
", sizeMs=" + sizeMs +
", advanceMs=" + advanceMs +
", graceMs=" + graceMs +
", segments=" + segments +
'}';
}
}

View File

@ -61,22 +61,6 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
return new UnlimitedWindows(DEFAULT_START_TIMESTAMP_MS);
}
/**
* Return a new unlimited window for the specified start timestamp.
*
* @param startMs the window start time
* @return a new unlimited window that starts at {@code startMs}
* @throws IllegalArgumentException if the start time is negative
* @deprecated Use {@link #startOn(Instant)} instead
*/
@Deprecated
public UnlimitedWindows startOn(final long startMs) throws IllegalArgumentException {
if (startMs < 0) {
throw new IllegalArgumentException("Window start time (startMs) cannot be negative.");
}
return new UnlimitedWindows(startMs);
}
/**
* Return a new unlimited window for the specified start timestamp.
*
@ -86,7 +70,11 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
*/
public UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(start, "start");
return startOn(ApiUtils.validateMillisecondInstant(start, msgPrefix));
final long startMs = ApiUtils.validateMillisecondInstant(start, msgPrefix);
if (startMs < 0) {
throw new IllegalArgumentException("Window start time (startMs) cannot be negative.");
}
return new UnlimitedWindows(startMs);
}
@Override
@ -112,38 +100,11 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
return Long.MAX_VALUE;
}
/**
* Throws an {@link IllegalArgumentException} because the retention time for unlimited windows is always infinite
* and cannot be changed.
*
* @throws IllegalArgumentException on every invocation.
* @deprecated since 2.1.
*/
@Override
@Deprecated
public UnlimitedWindows until(final long durationMs) {
throw new IllegalArgumentException("Window retention time (durationMs) cannot be set for UnlimitedWindows.");
}
/**
* {@inheritDoc}
* The retention time for unlimited windows in infinite and thus represented as {@link Long#MAX_VALUE}.
*
* @return the window retention time that is {@link Long#MAX_VALUE}
* @deprecated since 2.1. Use {@link Materialized#retention} instead.
*/
@Override
@Deprecated
public long maintainMs() {
return Long.MAX_VALUE;
}
@Override
public long gracePeriodMs() {
return 0L;
}
@SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public boolean equals(final Object o) {
if (this == o) {
@ -153,21 +114,18 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
return false;
}
final UnlimitedWindows that = (UnlimitedWindows) o;
return startMs == that.startMs && segments == that.segments;
return startMs == that.startMs;
}
@SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public int hashCode() {
return Objects.hash(startMs, segments);
return Objects.hash(startMs);
}
@SuppressWarnings("deprecation") // removing segments from Windows will fix this
@Override
public String toString() {
return "UnlimitedWindows{" +
"startMs=" + startMs +
", segments=" + segments +
'}';
}
}

View File

@ -17,13 +17,9 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import java.time.Duration;
import java.util.Map;
import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
/**
* The window specification for fixed size windows that is used to define window boundaries and grace period.
* <p>
@ -42,66 +38,12 @@ import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAU
*/
public abstract class Windows<W extends Window> {
private long maintainDurationMs = DEFAULT_RETENTION_MS;
@Deprecated public int segments = 3;
// By default grace period is 24 hours for all windows,
// in other words we allow out-of-order data for up to a day
protected static final long DEFAULT_GRACE_PERIOD_MS = 24 * 60 * 60 * 1000L;
protected Windows() {}
@Deprecated // remove this constructor when we remove segments.
Windows(final int segments) {
this.segments = segments;
}
/**
* Set the window maintain duration (retention time) in milliseconds.
* This retention time is a guaranteed <i>lower bound</i> for how long a window will be maintained.
*
* @param durationMs the window retention time in milliseconds
* @return itself
* @throws IllegalArgumentException if {@code durationMs} is negative
* @deprecated since 2.1. Use {@link Materialized#withRetention(Duration)}
* or directly configure the retention in a store supplier and use {@link Materialized#as(WindowBytesStoreSupplier)}.
*/
@Deprecated
public Windows<W> until(final long durationMs) throws IllegalArgumentException {
if (durationMs < 0) {
throw new IllegalArgumentException("Window retention time (durationMs) cannot be negative.");
}
maintainDurationMs = durationMs;
return this;
}
/**
* Return the window maintain duration (retention time) in milliseconds.
*
* @return the window maintain duration
* @deprecated since 2.1. Use {@link Materialized#retention} instead.
*/
@Deprecated
public long maintainMs() {
return maintainDurationMs;
}
/**
* Set the number of segments to be used for rolling the window store.
* This function is not exposed to users but can be called by developers that extend this class.
*
* @param segments the number of segments to be used
* @return itself
* @throws IllegalArgumentException if specified segments is small than 2
* @deprecated since 2.1 Override segmentInterval() instead.
*/
@Deprecated
protected Windows<W> segments(final int segments) throws IllegalArgumentException {
if (segments < 2) {
throw new IllegalArgumentException("Number of segments must be at least 2.");
}
this.segments = segments;
return this;
}
/**
* Create all windows that contain the provided timestamp, indexed by non-negative window start timestamps.
*

View File

@ -106,15 +106,11 @@ public class SessionWindowedCogroupedKStreamImpl<K, V> extends
sessionMerger);
}
@SuppressWarnings("deprecation") // continuing to support SessionWindows#maintainMs in fallback mode
private StoreBuilder<SessionStore<K, V>> materialize(final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> materialized) {
SessionBytesStoreSupplier supplier = (SessionBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
// NOTE: in the future, when we remove sessionWindows#maintainMs(), we should set the default retention
// to be (sessionWindows.inactivityGap() + sessionWindows.grace()). This will yield the same default behavior.
final long retentionPeriod = materialized.retention() != null ? materialized.retention().toMillis() : sessionWindows.maintainMs();
final long retentionPeriod = materialized.retention() != null ?
materialized.retention().toMillis() : sessionWindows.inactivityGap() + sessionWindows.gracePeriodMs();
if ((sessionWindows.inactivityGap() + sessionWindows.gracePeriodMs()) > retentionPeriod) {
throw new IllegalArgumentException("The retention period of the session store "

View File

@ -222,13 +222,11 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
materializedInternal.valueSerde());
}
@SuppressWarnings("deprecation") // continuing to support SessionWindows#maintainMs in fallback mode
private <VR> StoreBuilder<SessionStore<K, VR>> materialize(final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materialized) {
SessionBytesStoreSupplier supplier = (SessionBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
// NOTE: in the future, when we remove Windows#maintainMs(), we should set the default retention
// to be (windows.inactivityGap() + windows.grace()). This will yield the same default behavior.
final long retentionPeriod = materialized.retention() != null ? materialized.retention().toMillis() : windows.maintainMs();
final long retentionPeriod = materialized.retention() != null ?
materialized.retention().toMillis() : windows.inactivityGap() + windows.gracePeriodMs();
if ((windows.inactivityGap() + windows.gracePeriodMs()) > retentionPeriod) {
throw new IllegalArgumentException("The retention period of the session store "

View File

@ -33,7 +33,6 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
import java.time.Duration;
import java.util.Map;
@ -103,18 +102,15 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends Ab
windows);
}
@SuppressWarnings("deprecation")
// continuing to support Windows#maintainMs/segmentInterval in fallback mode
private StoreBuilder<TimestampedWindowStore<K, V>> materialize(
final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materialized) {
WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
if (materialized.retention() != null) {
// new style retention: use Materialized retention and default segmentInterval
final long retentionPeriod = materialized.retention().toMillis();
final long retentionPeriod = materialized.retention() != null ?
materialized.retention().toMillis() : windows.size() + windows.gracePeriodMs();
if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
throw new IllegalArgumentException("The retention period of the window store "
if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
throw new IllegalArgumentException("The retention period of the window store "
+ name
+ " must be no smaller than its window size plus the grace period."
+ " Got size=[" + windows.size() + "],"
@ -122,41 +118,16 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W extends Window> extends Ab
+ "],"
+ " retention=[" + retentionPeriod
+ "]");
}
}
supplier = Stores.persistentTimestampedWindowStore(
supplier = Stores.persistentTimestampedWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.size()),
false
);
} else {
// old style retention: use deprecated Windows retention/segmentInterval.
// NOTE: in the future, when we remove Windows#maintainMs(), we should set the default retention
// to be (windows.size() + windows.grace()). This will yield the same default behavior.
if ((windows.size() + windows.gracePeriodMs()) > windows.maintainMs()) {
throw new IllegalArgumentException("The retention period of the window store "
+ name
+ " must be no smaller than its window size plus the grace period."
+ " Got size=[" + windows.size() + "],"
+ " grace=[" + windows.gracePeriodMs()
+ "],"
+ " retention=[" + windows.maintainMs()
+ "]");
}
supplier = new RocksDbWindowBytesStoreSupplier(
materialized.storeName(),
windows.maintainMs(),
Math.max(windows.maintainMs() / (windows.segments - 1), 60_000L),
windows.size(),
false,
true);
}
);
}
final StoreBuilder<TimestampedWindowStore<K, V>> builder = Stores
.timestampedWindowStoreBuilder(
supplier,

View File

@ -35,7 +35,6 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
import java.time.Duration;
import java.util.Objects;
@ -211,52 +210,28 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
}
@SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
private <VR> StoreBuilder<TimestampedWindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
if (materialized.retention() != null) {
// new style retention: use Materialized retention and default segmentInterval
final long retentionPeriod = materialized.retention().toMillis();
final long retentionPeriod = materialized.retention() != null ?
materialized.retention().toMillis() : windows.size() + windows.gracePeriodMs();
if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
throw new IllegalArgumentException("The retention period of the window store "
+ name + " must be no smaller than its window size plus the grace period."
+ " Got size=[" + windows.size() + "],"
+ " grace=[" + windows.gracePeriodMs() + "],"
+ " retention=[" + retentionPeriod + "]");
}
if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
throw new IllegalArgumentException("The retention period of the window store "
+ name + " must be no smaller than its window size plus the grace period."
+ " Got size=[" + windows.size() + "],"
+ " grace=[" + windows.gracePeriodMs() + "],"
+ " retention=[" + retentionPeriod + "]");
}
supplier = Stores.persistentTimestampedWindowStore(
supplier = Stores.persistentTimestampedWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.size()),
false
);
} else {
// old style retention: use deprecated Windows retention/segmentInterval.
// NOTE: in the future, when we remove Windows#maintainMs(), we should set the default retention
// to be (windows.size() + windows.grace()). This will yield the same default behavior.
if ((windows.size() + windows.gracePeriodMs()) > windows.maintainMs()) {
throw new IllegalArgumentException("The retention period of the window store "
+ name + " must be no smaller than its window size plus the grace period."
+ " Got size=[" + windows.size() + "],"
+ " grace=[" + windows.gracePeriodMs() + "],"
+ " retention=[" + windows.maintainMs() + "]");
}
supplier = new RocksDbWindowBytesStoreSupplier(
materialized.storeName(),
windows.maintainMs(),
Math.max(windows.maintainMs() / (windows.segments - 1), 60_000L),
windows.size(),
false,
true);
}
);
}
final StoreBuilder<TimestampedWindowStore<K, VR>> builder = Stores.timestampedWindowStoreBuilder(
supplier,
materialized.keySerde(),

View File

@ -1,23 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
public final class WindowingDefaults {
private WindowingDefaults() {}
public static final long DEFAULT_RETENTION_MS = 24 * 60 * 60 * 1000L; // one day
}

View File

@ -25,7 +25,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
@SuppressWarnings("deprecation")
public class JoinWindowsTest {
private static final long ANY_SIZE = 123L;
@ -80,19 +79,6 @@ public class JoinWindowsTest {
assertEquals(windowSize, windowSpec.grace(ofMillis(windowSize)).gracePeriodMs());
}
@Deprecated
@Test
public void retentionTimeMustNoBeSmallerThanWindowSize() {
final JoinWindows windowSpec = JoinWindows.of(ofMillis(ANY_SIZE));
final long windowSize = windowSpec.size();
try {
windowSpec.until(windowSize - 1);
fail("should not accept retention time smaller than window size");
} catch (final IllegalArgumentException e) {
// expected
}
}
@Test
public void gracePeriodShouldEnforceBoundaries() {
JoinWindows.of(ofMillis(3L)).grace(ofMillis(0L));

View File

@ -25,7 +25,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
@SuppressWarnings("deprecation")
public class SessionWindowsTest {
@Test
@ -63,25 +62,6 @@ public class SessionWindowsTest {
assertThrows(IllegalArgumentException.class, () -> SessionWindows.with(ofMillis(0)));
}
@SuppressWarnings("deprecation") // specifically testing deprecated apis
@Test
public void retentionTimeShouldBeGapIfGapIsLargerThanDefaultRetentionTime() {
final long windowGap = 2 * SessionWindows.with(ofMillis(1)).maintainMs();
assertEquals(windowGap, SessionWindows.with(ofMillis(windowGap)).maintainMs());
}
@Deprecated
@Test
public void retentionTimeMustNotBeNegative() {
final SessionWindows windowSpec = SessionWindows.with(ofMillis(42));
try {
windowSpec.until(41);
fail("should not accept retention time smaller than gap");
} catch (final IllegalArgumentException e) {
// expected
}
}
@Test
public void equalsAndHashcodeShouldBeValidForPositiveCases() {
verifyEquality(SessionWindows.with(ofMillis(1)), SessionWindows.with(ofMillis(1)));

View File

@ -19,10 +19,8 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.junit.Test;
import java.time.Duration;
import java.util.Map;
import static java.time.Duration.ofDays;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
@ -31,7 +29,6 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
@SuppressWarnings("deprecation")
public class TimeWindowsTest {
private static final long ANY_SIZE = 123L;
@ -47,26 +44,6 @@ public class TimeWindowsTest {
assertEquals(anyAdvance, TimeWindows.of(ofMillis(ANY_SIZE)).advanceBy(ofMillis(anyAdvance)).advanceMs);
}
@SuppressWarnings("deprecation") // specifically testing deprecated APIs
@Test
public void shouldSetWindowRetentionTime() {
assertEquals(ANY_SIZE, TimeWindows.of(ofMillis(ANY_SIZE)).until(ANY_SIZE).maintainMs());
}
@SuppressWarnings("deprecation") // specifically testing deprecated APIs
@Test
public void shouldUseWindowSizeAsRetentionTimeIfWindowSizeIsLargerThanDefaultRetentionTime() {
final long windowSize = 2 * TimeWindows.of(ofMillis(1)).maintainMs();
assertEquals(windowSize, TimeWindows.of(ofMillis(windowSize)).maintainMs());
}
@Test
public void shouldUseWindowSizeAndGraceAsRetentionTimeIfBothCombinedAreLargerThanDefaultRetentionTime() {
final Duration windowsSize = ofDays(1).minus(ofMillis(1));
final Duration gracePeriod = ofMillis(2);
assertEquals(windowsSize.toMillis() + gracePeriod.toMillis(), TimeWindows.of(windowsSize).grace(gracePeriod).maintainMs());
}
@Test
public void windowSizeMustNotBeZero() {
assertThrows(IllegalArgumentException.class, () -> TimeWindows.of(ofMillis(0)));
@ -99,7 +76,6 @@ public class TimeWindowsTest {
}
}
@Deprecated
@Test
public void advanceIntervalMustNotBeLargerThanWindowSize() {
final TimeWindows windowSpec = TimeWindows.of(ofMillis(ANY_SIZE));
@ -111,18 +87,6 @@ public class TimeWindowsTest {
}
}
@Deprecated
@Test
public void retentionTimeMustNoBeSmallerThanWindowSize() {
final TimeWindows windowSpec = TimeWindows.of(ofMillis(ANY_SIZE));
try {
windowSpec.until(ANY_SIZE - 1);
fail("should not accept retention time smaller than window size");
} catch (final IllegalArgumentException e) {
// expected
}
}
@Test
public void gracePeriodShouldEnforceBoundaries() {
TimeWindows.of(ofMillis(3L)).grace(ofMillis(0L));

View File

@ -27,15 +27,14 @@ import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class UnlimitedWindowsTest {
private static long anyStartTime = 10L;
private static final long ANY_START_TIME = 10L;
@Test
public void shouldSetWindowStartTime() {
assertEquals(anyStartTime, UnlimitedWindows.of().startOn(ofEpochMilli(anyStartTime)).startMs);
assertEquals(ANY_START_TIME, UnlimitedWindows.of().startOn(ofEpochMilli(ANY_START_TIME)).startMs);
}
@Test
@ -43,38 +42,26 @@ public class UnlimitedWindowsTest {
assertThrows(IllegalArgumentException.class, () -> UnlimitedWindows.of().startOn(ofEpochMilli(-1)));
}
@SuppressWarnings("deprecation")
@Test
public void shouldThrowOnUntil() {
final UnlimitedWindows windowSpec = UnlimitedWindows.of();
try {
windowSpec.until(42);
fail("should not allow to set window retention time");
} catch (final IllegalArgumentException e) {
// expected
}
}
@Test
public void shouldIncludeRecordsThatHappenedOnWindowStart() {
final UnlimitedWindows w = UnlimitedWindows.of().startOn(ofEpochMilli(anyStartTime));
final UnlimitedWindows w = UnlimitedWindows.of().startOn(ofEpochMilli(ANY_START_TIME));
final Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(w.startMs);
assertEquals(1, matchedWindows.size());
assertEquals(new UnlimitedWindow(anyStartTime), matchedWindows.get(anyStartTime));
assertEquals(new UnlimitedWindow(ANY_START_TIME), matchedWindows.get(ANY_START_TIME));
}
@Test
public void shouldIncludeRecordsThatHappenedAfterWindowStart() {
final UnlimitedWindows w = UnlimitedWindows.of().startOn(ofEpochMilli(anyStartTime));
final UnlimitedWindows w = UnlimitedWindows.of().startOn(ofEpochMilli(ANY_START_TIME));
final long timestamp = w.startMs + 1;
final Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(timestamp);
assertEquals(1, matchedWindows.size());
assertEquals(new UnlimitedWindow(anyStartTime), matchedWindows.get(anyStartTime));
assertEquals(new UnlimitedWindow(ANY_START_TIME), matchedWindows.get(ANY_START_TIME));
}
@Test
public void shouldExcludeRecordsThatHappenedBeforeWindowStart() {
final UnlimitedWindows w = UnlimitedWindows.of().startOn(ofEpochMilli(anyStartTime));
final UnlimitedWindows w = UnlimitedWindows.of().startOn(ofEpochMilli(ANY_START_TIME));
final long timestamp = w.startMs - 1;
final Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(timestamp);
assertTrue(matchedWindows.isEmpty());
@ -85,7 +72,6 @@ public class UnlimitedWindowsTest {
verifyEquality(UnlimitedWindows.of(), UnlimitedWindows.of());
verifyEquality(UnlimitedWindows.of().startOn(ofEpochMilli(1)), UnlimitedWindows.of().startOn(ofEpochMilli(1)));
}
@Test

View File

@ -1,63 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import org.junit.Test;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
public class WindowsTest {
private class TestWindows extends Windows {
@Override
public Map windowsFor(final long timestamp) {
return null;
}
@Override
public long size() {
return 0;
}
public long gracePeriodMs() {
return 0L;
}
}
@SuppressWarnings("deprecation") // specifically testing deprecated APIs
@Test
public void shouldSetWindowRetentionTime() {
final int anyNotNegativeRetentionTime = 42;
assertEquals(anyNotNegativeRetentionTime, new TestWindows().until(anyNotNegativeRetentionTime).maintainMs());
}
@SuppressWarnings("deprecation") // specifically testing deprecated APIs
@Test
public void numberOfSegmentsMustBeAtLeastTwo() {
assertThrows(IllegalArgumentException.class, () -> new TestWindows().segments(1));
}
@SuppressWarnings("deprecation") // specifically testing deprecated APIs
@Test
public void retentionTimeMustNotBeNegative() {
assertThrows(IllegalArgumentException.class, () -> new TestWindows().until(-1));
}
}

View File

@ -46,6 +46,7 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.Matcher;
import org.junit.Test;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
@ -317,18 +318,21 @@ public class KStreamWindowAggregateTest {
shouldLogAndMeterWhenSkippingExpiredWindow(StreamsConfig.METRICS_0100_TO_24);
}
@Deprecated // testing deprecated functionality (behavior of until)
private void shouldLogAndMeterWhenSkippingExpiredWindow(final String builtInMetricsVersion) {
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).until(100))
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).grace(ofMillis(90)))
.aggregate(
() -> "",
MockAggregator.toStringInstance("+"),
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized")
.withValueSerde(Serdes.String())
.withCachingDisabled()
.withLoggingDisabled()
.withRetention(Duration.ofMillis(100))
)
.toStream()
.map((key, value) -> new KeyValue<>(key.toString(), value))

View File

@ -185,8 +185,8 @@ public class RepartitionOptimizingTest {
.filter((k, v) -> k.equals("A"), Named.as("join-filter"))
.join(countStream, (v1, v2) -> v1 + ":" + v2.toString(),
JoinWindows.of(ofMillis(5000)),
StreamJoined.<String, String, Long>with(Stores.inMemoryWindowStore("join-store", ofDays(1), ofMillis(10000), true),
Stores.inMemoryWindowStore("other-join-store", ofDays(1), ofMillis(10000), true))
StreamJoined.<String, String, Long>with(Stores.inMemoryWindowStore("join-store", ofDays(1).plus(ofMillis(10000)), ofMillis(10000), true),
Stores.inMemoryWindowStore("other-join-store", ofDays(1).plus(ofMillis(10000)), ofMillis(10000), true))
.withName("join")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())