diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java index 1b467eb2e72..a5086de8c66 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java @@ -191,6 +191,8 @@ public class PageViewTypedDemo { final KTable users = builder.table("streams-userprofile-input", Consumed.with(Serdes.String(), new JSONSerde<>())); + final Duration duration24Hours = Duration.ofHours(24); + final KStream regionCount = views .leftJoin(users, (view, profile) -> { final PageViewByRegion viewByRegion = new PageViewByRegion(); @@ -206,7 +208,7 @@ public class PageViewTypedDemo { }) .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion)) .groupByKey(Grouped.with(Serdes.String(), new JSONSerde<>())) - .windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1))) + .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(7), duration24Hours).advanceBy(Duration.ofSeconds(1))) .count() .toStream() .map((key, value) -> { diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java index 2a9972b3838..cdb36394a98 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java @@ -78,6 +78,8 @@ public class PageViewUntypedDemo { final KTable userRegions = users.mapValues(record -> record.get("region").textValue()); + final Duration duration24Hours = Duration.ofHours(24); + final KStream regionCount = views .leftJoin(userRegions, (view, region) -> { final ObjectNode jNode = JsonNodeFactory.instance.objectNode(); @@ -88,7 +90,7 @@ public class PageViewUntypedDemo { }) .map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion)) .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) - .windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1))) + .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(7), duration24Hours).advanceBy(Duration.ofSeconds(1))) .count() .toStream() .map((key, value) -> { diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java index 4d63d306330..6384466f99c 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java @@ -79,6 +79,8 @@ public class TemperatureDemo { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + final Duration duration24Hours = Duration.ofHours(24); + final StreamsBuilder builder = new StreamsBuilder(); final KStream source = builder.stream("iot-temperature"); @@ -88,7 +90,7 @@ public class TemperatureDemo { // to group and reduce them, a key is needed ("temp" has been chosen) .selectKey((key, value) -> "temp") .groupByKey() - .windowedBy(TimeWindows.of(Duration.ofSeconds(TEMPERATURE_WINDOW_SIZE))) + .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(TEMPERATURE_WINDOW_SIZE), duration24Hours)) .reduce((value1, value2) -> { if (Integer.parseInt(value1) > Integer.parseInt(value2)) { return value1; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index 26412861cef..84e3f7fc262 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -79,10 +79,7 @@ public class JoinWindows extends Windows { protected final boolean enableSpuriousResultFix; protected JoinWindows(final JoinWindows joinWindows) { - beforeMs = joinWindows.beforeMs; - afterMs = joinWindows.afterMs; - graceMs = joinWindows.graceMs; - enableSpuriousResultFix = joinWindows.enableSpuriousResultFix; + this(joinWindows.beforeMs, joinWindows.afterMs, joinWindows.graceMs, joinWindows.enableSpuriousResultFix); } private JoinWindows(final long beforeMs, @@ -92,32 +89,62 @@ public class JoinWindows extends Windows { if (beforeMs + afterMs < 0) { throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative."); } + + if (graceMs < 0) { + throw new IllegalArgumentException("Grace period must not be negative."); + } + this.afterMs = afterMs; this.beforeMs = beforeMs; this.graceMs = graceMs; this.enableSpuriousResultFix = enableSpuriousResultFix; } + /** + * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}, + * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than + * the timestamp of the record from the primary stream. Using the method explicitly sets the grace period to + * the duration specified by {@code afterWindowEnd} which means that out of order records arriving + * after the window end will be dropped. The delay is defined as (stream_time - record_timestamp). + * + * @param timeDifference join window interval + * @param afterWindowEnd The grace period to admit out-of-order events to a window. + * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds} + * @return A new JoinWindows object with the specified window definition and grace period + */ public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) { return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), afterWindowEnd.toMillis(), true); } + /** + * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}, + * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than + * the timestamp of the record from the primary stream. Using the method implicitly sets the grace period to zero + * which means that out of order records arriving after the window end will be dropped. + * + * @param timeDifference join window interval + * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds} + * @return a new JoinWindows object with the window definition and no grace period. Note that this means out of order records arriving after the window end will be dropped + */ public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference) { - return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), 0L, true); + return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), NO_GRACE_PERIOD, true); } - /** + /** * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}, * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than * the timestamp of the record from the primary stream. * - * @param timeDifference join window interval + * @param timeDifference + * @return a new JoinWindows object with the window definition with and grace period (uses old default of 24 hours) * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds} + * @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead */ + @Deprecated public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException { final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference"); final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefix); - return new JoinWindows(timeDifferenceMs, timeDifferenceMs, DEFAULT_GRACE_PERIOD_MS, false); + return new JoinWindows(timeDifferenceMs, timeDifferenceMs, DEPRECATED_OLD_24_HR_GRACE_PERIOD, false); } /** @@ -177,7 +204,9 @@ public class JoinWindows extends Windows { * @param afterWindowEnd The grace period to admit out-of-order events to a window. * @return this updated builder * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds} + * @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)} instead */ + @Deprecated public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException { final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd"); final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java index f41dd67c168..65bcfd0498a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java @@ -23,8 +23,9 @@ import java.util.Objects; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; -import static org.apache.kafka.streams.kstream.Windows.DEFAULT_GRACE_PERIOD_MS; - +import static org.apache.kafka.streams.kstream.Windows.DEPRECATED_OLD_24_HR_GRACE_PERIOD; +import static org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD; +import static java.time.Duration.ofMillis; /** * A session based window specification used for aggregating events into sessions. @@ -78,23 +79,68 @@ public final class SessionWindows { private SessionWindows(final long gapMs, final long graceMs) { this.gapMs = gapMs; this.graceMs = graceMs; + + if (gapMs <= 0) { + throw new IllegalArgumentException("Gap time cannot be zero or negative."); + } + + if (graceMs < 0) { + throw new IllegalArgumentException("Grace period must not be negative."); + } } + /** + * Creates a new window specification with the specified inactivity gap. + * Using the method implicitly sets the grace period to zero which + * means that out of order records arriving after the window end will be dropped + * + *

+ * Note that new events may change the boundaries of session windows, so aggressive + * close times can lead to surprising results in which an out-of-order event is rejected and then + * a subsequent event moves the window boundary forward. + * + * @param inactivityGap the gap of inactivity between sessions + * @return a window definition with the window size and no grace period. Note that this means out of order records arriving after the window end will be dropped + * @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't be represented as {@code long milliseconds} + */ + public static SessionWindows ofInactivityGapWithNoGrace(final Duration inactivityGap) { + return ofInactivityGapAndGrace(inactivityGap, ofMillis(NO_GRACE_PERIOD)); + } + + /** + * Creates a new window specification with the specified inactivity gap. + * Using the method explicitly sets the grace period to the duration specified by {@code afterWindowEnd} which + * means that out of order records arriving after the window end will be dropped + * + *

+ * Note that new events may change the boundaries of session windows, so aggressive + * close times can lead to surprising results in which an out-of-order event is rejected and then + * a subsequent event moves the window boundary forward. + * + * @param inactivityGap the gap of inactivity between sessions + * @param afterWindowEnd The grace period to admit out-of-order events to a window. + * @return A SessionWindows object with the specified inactivity gap and grace period + * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds} + */ + public static SessionWindows ofInactivityGapAndGrace(final Duration inactivityGap, final Duration afterWindowEnd) { + return new SessionWindows(inactivityGap.toMillis(), afterWindowEnd.toMillis()); + } + + /** * Create a new window specification with the specified inactivity gap. * * @param inactivityGap the gap of inactivity between sessions - * @return a new window specification with default maintain duration of 1 day - * + * @return a new window specification without specifying a grace period (uses old default of 24 hours) * @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't be represented as {@code long milliseconds} + * @deprecated since 3.0 Use {@link #ofInactivityGapWithNoGrace(Duration)} instead */ + @Deprecated public static SessionWindows with(final Duration inactivityGap) { final String msgPrefix = prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap"); final long inactivityGapMs = validateMillisecondDuration(inactivityGap, msgPrefix); - if (inactivityGapMs <= 0) { - throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative."); - } - return new SessionWindows(inactivityGapMs, DEFAULT_GRACE_PERIOD_MS); + + return new SessionWindows(inactivityGapMs, DEPRECATED_OLD_24_HR_GRACE_PERIOD); } /** @@ -108,13 +154,12 @@ public final class SessionWindows { * @param afterWindowEnd The grace period to admit out-of-order events to a window. * @return this updated builder * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds} + * @deprecated since 3.0 Use {@link #ofInactivityGapAndGrace(Duration, Duration)} instead */ + @Deprecated public SessionWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException { final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd"); final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix); - if (afterWindowEndMs < 0) { - throw new IllegalArgumentException("Grace period must not be negative."); - } return new SessionWindows(gapMs, afterWindowEndMs); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java index 189770ff26e..2cbda6d593f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java @@ -17,10 +17,14 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.processor.TimestampExtractor; + import java.time.Duration; import java.util.Objects; + +import static java.time.Duration.ofMillis; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; +import static org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD; /** * A sliding window used for aggregating events. @@ -78,6 +82,45 @@ public final class SlidingWindows { private SlidingWindows(final long timeDifferenceMs, final long graceMs) { this.timeDifferenceMs = timeDifferenceMs; this.graceMs = graceMs; + + if (timeDifferenceMs < 0) { + throw new IllegalArgumentException("Window time difference must not be negative."); + } + + if (graceMs < 0) { + throw new IllegalArgumentException("Window grace period must not be negative."); + } + } + + /** + * Return a window definition with the window size + * Using the method implicitly sets the grace period to zero which means that + * out of order records arriving after the window end will be dropped + * + * @param timeDifference the max time difference (inclusive) between two records in a window + * @return a new window definition with no grace period. Note that this means out of order records arriving after the window end will be dropped + * @throws IllegalArgumentException if the timeDifference is negative + */ + public static SlidingWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference) throws IllegalArgumentException { + return ofTimeDifferenceAndGrace(timeDifference, ofMillis(NO_GRACE_PERIOD)); + } + + /** + * Return a window definition with the window size based on the given maximum time difference (inclusive) between + * records in the same window and given window grace period. Reject out-of-order events that arrive after {@code afterWindowEnd}. + * A window is closed when {@code stream-time > window-end + grace-period}. + * + * @param timeDifference the max time difference (inclusive) between two records in a window + * @param afterWindowEnd the grace period to admit out-of-order events to a window + * @return a new window definition with the specified grace period + * @throws IllegalArgumentException if the timeDifference or afterWindowEnd (grace period) is negative + */ + public static SlidingWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) throws IllegalArgumentException { + + final long timeDifferenceMs = timeDifference.toMillis(); + final long afterWindowEndMs = afterWindowEnd.toMillis(); + + return new SlidingWindows(timeDifferenceMs, afterWindowEndMs); } /** @@ -89,18 +132,16 @@ public final class SlidingWindows { * @param grace the grace period to admit out-of-order events to a window * @return a new window definition * @throws IllegalArgumentException if the specified window size is < 0 or grace < 0, or either can't be represented as {@code long milliseconds} + * @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)} or {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead */ + @Deprecated public static SlidingWindows withTimeDifferenceAndGrace(final Duration timeDifference, final Duration grace) throws IllegalArgumentException { final String msgPrefixSize = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference"); final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefixSize); - if (timeDifferenceMs < 0) { - throw new IllegalArgumentException("Window time difference must not be negative."); - } + final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace, "grace"); final long graceMs = validateMillisecondDuration(grace, msgPrefixGrace); - if (graceMs < 0) { - throw new IllegalArgumentException("Window grace period must not be negative."); - } + return new SlidingWindows(timeDifferenceMs, graceMs); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java index 21495567678..7970085029c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java @@ -24,6 +24,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; +import static java.time.Duration.ofMillis; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; @@ -72,6 +73,59 @@ public final class TimeWindows extends Windows { this.sizeMs = sizeMs; this.advanceMs = advanceMs; this.graceMs = graceMs; + + if (sizeMs <= 0) { + throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero."); + } + + if (graceMs < 0) { + throw new IllegalArgumentException("Grace period must not be negative."); + } + } + + /** + * Return a window definition with the given window size, and with the advance interval being equal to the window + * size. + * The time interval represented by the N-th window is: {@code [N * size, N * size + size)}. + *

+ * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows. + * Tumbling windows are a special case of hopping windows with {@code advance == size}. + * Using the method implicitly sets the grace period to zero which means + * that out of order records arriving after the window end will be dropped + * + * @param size The size of the window + * @return a new window definition with default no grace period. Note that this means out of order records arriving after the window end will be dropped + * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds} + */ + public static TimeWindows ofSizeWithNoGrace(final Duration size) throws IllegalArgumentException { + return ofSizeAndGrace(size, ofMillis(NO_GRACE_PERIOD)); + } + + /** + * Return a window definition with the given window size, and with the advance interval being equal to the window + * size. + * The time interval represented by the N-th window is: {@code [N * size, N * size + size)}. + *

+ * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows. + * Tumbling windows are a special case of hopping windows with {@code advance == size}. + * Using the method explicitly sets the grace period to the duration specified by {@code afterWindowEnd} which means + * that out of order records arriving after the window end will be dropped. + * + *

+ * Delay is defined as (stream_time - record_timestamp). + * + * @param size The size of the window. Must be larger than zero + * @param afterWindowEnd The grace period to admit out-of-order events to a window. Must be non-negative. + * @return a TimeWindows object with the specified size and the specified grace period + * @throws IllegalArgumentException if {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds} + */ + public static TimeWindows ofSizeAndGrace(final Duration size, final Duration afterWindowEnd) + throws IllegalArgumentException { + + final long sizeMs = size.toMillis(); + final long afterWindowEndMs = afterWindowEnd.toMillis(); + + return new TimeWindows(sizeMs, sizeMs, afterWindowEndMs); } /** @@ -83,16 +137,16 @@ public final class TimeWindows extends Windows { * Tumbling windows are a special case of hopping windows with {@code advance == size}. * * @param size The size of the window - * @return a new window definition with default maintain duration of 1 day + * @return a new window definition without specifying the grace period (uses old default of 24 hours) * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds} + * @deprecated since 3.0 Use {@link #ofSizeWithNoGrace(Duration)} } instead */ + @Deprecated public static TimeWindows of(final Duration size) throws IllegalArgumentException { final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size"); final long sizeMs = validateMillisecondDuration(size, msgPrefix); - if (sizeMs <= 0) { - throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero."); - } - return new TimeWindows(sizeMs, sizeMs, DEFAULT_GRACE_PERIOD_MS); + + return new TimeWindows(sizeMs, sizeMs, DEPRECATED_OLD_24_HR_GRACE_PERIOD); } /** @@ -142,7 +196,9 @@ public final class TimeWindows extends Windows { * @param afterWindowEnd The grace period to admit out-of-order events to a window. * @return this updated builder * @throws IllegalArgumentException if {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds} + * @deprecated since 3.0 Use {@link #ofSizeAndGrace(Duration, Duration)} instead */ + @Deprecated public TimeWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException { final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd"); final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java index bece9e01951..f0204d0409c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java @@ -38,9 +38,17 @@ import java.util.Map; */ public abstract class Windows { - // By default grace period is 24 hours for all windows, - // in other words we allow out-of-order data for up to a day - protected static final long DEFAULT_GRACE_PERIOD_MS = 24 * 60 * 60 * 1000L; + /** + * By default grace period is 24 hours for all windows in other words we allow out-of-order data for up to a day + * This behavior is now deprecated and additional details are available in the motivation for the KIP + * Check out KIP-633 for more details + */ + protected static final long DEPRECATED_OLD_24_HR_GRACE_PERIOD = 24 * 60 * 60 * 1000L; + + /** + * This constant is used as the specified grace period where we do not have any grace periods instead of magic constants + */ + protected static final long NO_GRACE_PERIOD = 0L; protected Windows() {} diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index 048a55bc376..9a178d7a45c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -62,6 +62,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; +@SuppressWarnings("deprecation") public class TopologyTest { private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java index c2b39b57b08..fd5da124bb9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -341,6 +341,7 @@ public abstract class AbstractResetIntegrationTest { } } + @SuppressWarnings("deprecation") private Topology setupTopologyWithIntermediateTopic(final boolean useRepartitioned, final String outputTopic2) { final StreamsBuilder builder = new StreamsBuilder(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index eb241c566f6..504d9f6385e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -71,6 +71,7 @@ import static org.junit.Assert.assertTrue; /** * Tests related to internal topics in streams */ +@SuppressWarnings("deprecation") @Category({IntegrationTest.class}) public class InternalTopicIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java index f5ed891afc1..59d8603851b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java @@ -49,6 +49,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThrows; +@SuppressWarnings("deprecation") @Category({IntegrationTest.class}) public class JoinStoreIntegrationTest { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 8eba6deba4d..4fe35a67daf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -65,6 +65,7 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa * by virtue of having a large commit interval */ @Category({IntegrationTest.class}) +@SuppressWarnings("deprecation") public class KStreamAggregationDedupIntegrationTest { private static final int NUM_BROKERS = 1; private static final long COMMIT_INTERVAL_MS = 300L; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index b7b1f4edfad..1b92ab5551f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -98,7 +98,7 @@ import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -@SuppressWarnings("unchecked") +@SuppressWarnings({"unchecked", "deprecation"}) @Category({IntegrationTest.class}) public class KStreamAggregationIntegrationTest { private static final int NUM_BROKERS = 1; @@ -209,6 +209,7 @@ public class KStreamAggregationIntegrationTest { return keyComparison; } + @SuppressWarnings("deprecation") @Test public void shouldReduceWindowed() throws Exception { final long firstBatchTimestamp = mockTime.milliseconds(); @@ -219,6 +220,7 @@ public class KStreamAggregationIntegrationTest { produceMessages(secondBatchTimestamp); final Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L); + //noinspection deprecation groupedStream .windowedBy(TimeWindows.of(ofMillis(500L))) .reduce(reducer) @@ -318,6 +320,7 @@ public class KStreamAggregationIntegrationTest { ))); } + @SuppressWarnings("deprecation") @Test public void shouldAggregateWindowed() throws Exception { final long firstTimestamp = mockTime.milliseconds(); @@ -328,6 +331,7 @@ public class KStreamAggregationIntegrationTest { produceMessages(secondTimestamp); final Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L); + //noinspection deprecation groupedStream.windowedBy(TimeWindows.of(ofMillis(500L))) .aggregate( initializer, @@ -442,12 +446,14 @@ public class KStreamAggregationIntegrationTest { shouldCountHelper(); } + @SuppressWarnings("deprecation") @Test public void shouldGroupByKey() throws Exception { final long timestamp = mockTime.milliseconds(); produceMessages(timestamp); produceMessages(timestamp); + //noinspection deprecation stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String())) .windowedBy(TimeWindows.of(ofMillis(500L))) .count() @@ -476,6 +482,7 @@ public class KStreamAggregationIntegrationTest { ))); } + @SuppressWarnings("deprecation") @Test public void shouldReduceSlidingWindows() throws Exception { final long firstBatchTimestamp = mockTime.milliseconds(); @@ -487,6 +494,7 @@ public class KStreamAggregationIntegrationTest { produceMessages(thirdBatchTimestamp); final Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference); + //noinspection deprecation groupedStream .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(2000L))) .reduce(reducer) @@ -580,6 +588,7 @@ public class KStreamAggregationIntegrationTest { } } + @SuppressWarnings("deprecation") @Test public void shouldAggregateSlidingWindows() throws Exception { final long firstBatchTimestamp = mockTime.milliseconds(); @@ -591,6 +600,7 @@ public class KStreamAggregationIntegrationTest { produceMessages(thirdBatchTimestamp); final Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference); + //noinspection deprecation groupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMinutes(5))) .aggregate( initializer, @@ -689,6 +699,7 @@ public class KStreamAggregationIntegrationTest { } + @SuppressWarnings("deprecation") @Test public void shouldCountSessionWindows() throws Exception { final long sessionGap = 5 * 60 * 1000L; @@ -761,6 +772,7 @@ public class KStreamAggregationIntegrationTest { final Map, KeyValue> results = new HashMap<>(); final CountDownLatch latch = new CountDownLatch(13); + //noinspection deprecation builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(SessionWindows.with(ofMillis(sessionGap))) @@ -797,6 +809,7 @@ public class KStreamAggregationIntegrationTest { assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(KeyValue.pair(1L, t3))); } + @SuppressWarnings("deprecation") @Test public void shouldReduceSessionWindows() throws Exception { final long sessionGap = 1000L; // something to do with time @@ -869,6 +882,7 @@ public class KStreamAggregationIntegrationTest { final Map, KeyValue> results = new HashMap<>(); final CountDownLatch latch = new CountDownLatch(13); final String userSessionsStore = "UserSessionsStore"; + //noinspection deprecation builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(SessionWindows.with(ofMillis(sessionGap))) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java index 4ae4fdba850..c2dee61a57b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java @@ -81,6 +81,7 @@ import static org.junit.Assert.assertTrue; @RunWith(value = Parameterized.class) @Category({IntegrationTest.class}) +@SuppressWarnings("deprecation") public class KStreamRepartitionIntegrationTest { private static final int NUM_BROKERS = 1; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java index b8ee31bab3b..84f5cfce483 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java @@ -66,6 +66,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @Category({IntegrationTest.class}) +@SuppressWarnings("deprecation") public class MetricsIntegrationTest { private static final int NUM_BROKERS = 1; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 4e9b2b53ebc..d07648be85f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -120,6 +120,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; @Category({IntegrationTest.class}) +@SuppressWarnings("deprecation") public class QueryableStateIntegrationTest { private static final Logger log = LoggerFactory.getLogger(QueryableStateIntegrationTest.class); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java index 648cfdad31b..c698d067722 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java @@ -69,6 +69,7 @@ import static org.hamcrest.Matchers.notNullValue; @Category({IntegrationTest.class}) @RunWith(Parameterized.class) +@SuppressWarnings("deprecation") public class RocksDBMetricsIntegrationTest { private static final int NUM_BROKERS = 3; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java index cf9b6d6cfcc..ca1512f236f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java @@ -28,11 +28,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; +@SuppressWarnings("deprecation") public class JoinWindowsTest { private static final long ANY_SIZE = 123L; private static final long ANY_OTHER_SIZE = 456L; // should be larger than anySize + private static final long ANY_GRACE = 1024L; + @SuppressWarnings("deprecation") @Test public void validWindows() { JoinWindows.of(ofMillis(ANY_OTHER_SIZE)) // [ -anyOtherSize ; anyOtherSize ] @@ -69,6 +72,8 @@ public class JoinWindowsTest { @Test public void timeDifferenceMustNotBeNegative() { assertThrows(IllegalArgumentException.class, () -> JoinWindows.of(ofMillis(-1))); + assertThrows(IllegalArgumentException.class, () -> JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(-1))); + assertThrows(IllegalArgumentException.class, () -> JoinWindows.ofTimeDifferenceAndGrace(ofMillis(-1), ofMillis(ANY_GRACE))); } @Test @@ -133,6 +138,16 @@ public class JoinWindowsTest { JoinWindows.of(ofMillis(9)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60)), JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60)) ); + + verifyEquality( + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3)), + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3)) + ); + + verifyEquality( + JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(4)), + JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(4)) + ); } @Test @@ -162,5 +177,15 @@ public class JoinWindowsTest { JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(9)), JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)) ); + + verifyInEquality( + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(9)), + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3)) + ); + + verifyInEquality( + JoinWindows.ofTimeDifferenceAndGrace(ofMillis(9), ofMillis(9)), + JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(9)) + ); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java index 5545fb6ef1a..cad978c0de5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java @@ -38,6 +38,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +@SuppressWarnings("deprecation") public class RepartitionTopicNamingTest { private final KeyValueMapper kvMapper = (k, v) -> k + v; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java index 14104d69161..f38be3cbdd2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java @@ -25,12 +25,17 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; +@SuppressWarnings("deprecation") public class SessionWindowsTest { @Test public void shouldSetWindowGap() { final long anyGap = 42L; + final long anyGrace = 1024L; + assertEquals(anyGap, SessionWindows.with(ofMillis(anyGap)).inactivityGap()); + assertEquals(anyGap, SessionWindows.ofInactivityGapWithNoGrace(ofMillis(anyGap)).inactivityGap()); + assertEquals(anyGap, SessionWindows.ofInactivityGapAndGrace(ofMillis(anyGap), ofMillis(anyGrace)).inactivityGap()); } @Test @@ -66,6 +71,15 @@ public class SessionWindowsTest { public void equalsAndHashcodeShouldBeValidForPositiveCases() { verifyEquality(SessionWindows.with(ofMillis(1)), SessionWindows.with(ofMillis(1))); + verifyEquality(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)), + SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)) + ); + + verifyEquality( + SessionWindows.ofInactivityGapAndGrace(ofMillis(1), ofMillis(11)), + SessionWindows.ofInactivityGapAndGrace(ofMillis(1), ofMillis(11)) + ); + verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(6)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6))); verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(7)), SessionWindows.with(ofMillis(1)).grace(ofMillis(7))); @@ -75,6 +89,15 @@ public class SessionWindowsTest { @Test public void equalsAndHashcodeShouldBeValidForNegativeCases() { + + verifyInEquality( + SessionWindows.ofInactivityGapWithNoGrace(ofMillis(9)), + SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1))); + + verifyInEquality( + SessionWindows.ofInactivityGapAndGrace(ofMillis(9), ofMillis(9)), + SessionWindows.ofInactivityGapAndGrace(ofMillis(1), ofMillis(9))); + verifyInEquality(SessionWindows.with(ofMillis(9)), SessionWindows.with(ofMillis(1))); verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(9)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6))); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java index f6c63a304f6..dd06984611e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java @@ -24,13 +24,17 @@ import static org.apache.kafka.streams.EqualityCheck.verifyInEquality; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +@SuppressWarnings("deprecation") public class SlidingWindowsTest { private static final long ANY_SIZE = 123L; + private static final long ANY_GRACE = 1024L; @Test public void shouldSetTimeDifference() { assertEquals(ANY_SIZE, SlidingWindows.withTimeDifferenceAndGrace(ofMillis(ANY_SIZE), ofMillis(3)).timeDifferenceMs()); + assertEquals(ANY_SIZE, SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(ANY_SIZE), ofMillis(ANY_GRACE)).timeDifferenceMs()); + assertEquals(ANY_SIZE, SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(ANY_SIZE)).timeDifferenceMs()); } @Test @@ -56,6 +60,16 @@ public class SlidingWindowsTest { SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(grace)), SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(grace)) ); + + verifyEquality( + SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(grace)), + SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(grace)) + ); + + verifyEquality( + SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(timeDifference)), + SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(timeDifference)) + ); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java index 765bad5d68c..25a607d0a96 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java @@ -29,13 +29,17 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; +@SuppressWarnings("deprecation") public class TimeWindowsTest { private static final long ANY_SIZE = 123L; + private static final long ANY_GRACE = 1024L; @Test public void shouldSetWindowSize() { assertEquals(ANY_SIZE, TimeWindows.of(ofMillis(ANY_SIZE)).sizeMs); + assertEquals(ANY_SIZE, TimeWindows.ofSizeWithNoGrace(ofMillis(ANY_SIZE)).sizeMs); + assertEquals(ANY_SIZE, TimeWindows.ofSizeAndGrace(ofMillis(ANY_SIZE), ofMillis(ANY_GRACE)).sizeMs); } @Test @@ -140,10 +144,27 @@ public class TimeWindowsTest { TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).grace(ofMillis(4)), TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).grace(ofMillis(4)) ); + + verifyEquality(TimeWindows.ofSizeWithNoGrace(ofMillis(3)), TimeWindows.ofSizeWithNoGrace(ofMillis(3))); + + verifyEquality(TimeWindows.ofSizeAndGrace(ofMillis(3), ofMillis(33)), + TimeWindows.ofSizeAndGrace(ofMillis(3), ofMillis(33)) + ); } @Test public void equalsAndHashcodeShouldBeValidForNegativeCases() { + + verifyInEquality( + TimeWindows.ofSizeWithNoGrace(ofMillis(9)), + TimeWindows.ofSizeWithNoGrace(ofMillis(3)) + ); + + verifyInEquality( + TimeWindows.ofSizeAndGrace(ofMillis(9), ofMillis(9)), + TimeWindows.ofSizeAndGrace(ofMillis(3), ofMillis(9)) + ); + verifyInEquality(TimeWindows.of(ofMillis(9)), TimeWindows.of(ofMillis(3))); verifyInEquality(TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)), TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1))); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java index b710e24b644..eba39a74e83 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java @@ -61,7 +61,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; - @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. public class KGroupedStreamImplTest { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 8464ab960f0..97534531dc7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -100,6 +100,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; + @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. public class KStreamImplTest { @@ -704,6 +705,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("materialized can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullOtherStreamOnJoin() { final NullPointerException exception = assertThrows( @@ -712,6 +714,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("otherStream can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullOtherStreamOnJoinWithStreamJoined() { final NullPointerException exception = assertThrows( @@ -724,6 +727,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("otherStream can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullValueJoinerOnJoin() { final NullPointerException exception = assertThrows( @@ -732,6 +736,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("joiner can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullValueJoinerWithKeyOnJoin() { final NullPointerException exception = assertThrows( @@ -740,6 +745,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("joiner can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullValueJoinerOnJoinWithStreamJoined() { final NullPointerException exception = assertThrows( @@ -752,6 +758,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("joiner can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullValueJoinerWithKeyOnJoinWithStreamJoined() { final NullPointerException exception = assertThrows( @@ -784,6 +791,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("windows can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullStreamJoinedOnJoin() { final NullPointerException exception = assertThrows( @@ -796,6 +804,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("streamJoined can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullOtherStreamOnLeftJoin() { final NullPointerException exception = assertThrows( @@ -804,6 +813,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("otherStream can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullOtherStreamOnLeftJoinWithStreamJoined() { final NullPointerException exception = assertThrows( @@ -816,6 +826,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("otherStream can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullValueJoinerOnLeftJoin() { final NullPointerException exception = assertThrows( @@ -824,6 +835,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("joiner can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullValueJoinerWithKeyOnLeftJoin() { final NullPointerException exception = assertThrows( @@ -832,6 +844,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("joiner can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullValueJoinerOnLeftJoinWithStreamJoined() { final NullPointerException exception = assertThrows( @@ -844,6 +857,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("joiner can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullValueJoinerWithKeyOnLeftJoinWithStreamJoined() { final NullPointerException exception = assertThrows( @@ -877,6 +891,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("windows can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullStreamJoinedOnLeftJoin() { final NullPointerException exception = assertThrows( @@ -889,6 +904,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("streamJoined can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullOtherStreamOnOuterJoin() { final NullPointerException exception = assertThrows( @@ -897,6 +913,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("otherStream can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullOtherStreamOnOuterJoinWithStreamJoined() { final NullPointerException exception = assertThrows( @@ -909,6 +926,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("otherStream can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullValueJoinerOnOuterJoin() { final NullPointerException exception = assertThrows( @@ -917,6 +935,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("joiner can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullValueJoinerWithKeyOnOuterJoin() { final NullPointerException exception = assertThrows( @@ -925,6 +944,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("joiner can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullValueJoinerOnOuterJoinWithStreamJoined() { final NullPointerException exception = assertThrows( @@ -937,6 +957,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("joiner can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullValueJoinerWithKeyOnOuterJoinWithStreamJoined() { final NullPointerException exception = assertThrows( @@ -969,6 +990,7 @@ public class KStreamImplTest { assertThat(exception.getMessage(), equalTo("windows can't be null")); } + @SuppressWarnings("deprecation") @Test public void shouldNotAllowNullStreamJoinedOnOuterJoin() { final NullPointerException exception = assertThrows( @@ -1511,6 +1533,7 @@ public class KStreamImplTest { assertThat(mockProcessors.get(1).processed(), equalTo(Collections.singletonList(new KeyValueTimestamp<>("b", "v1", 0)))); } + @SuppressWarnings("deprecation") @Test public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreatedWithRetention() { final StreamsBuilder builder = new StreamsBuilder(); @@ -1538,6 +1561,7 @@ public class KStreamImplTest { } } + @SuppressWarnings("deprecation") @Test public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() { final StreamsBuilder builder = new StreamsBuilder(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index 17575849f30..1d50a3730f3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -63,6 +63,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; + @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. public class KStreamKStreamJoinTest { private final String topic1 = "topic1"; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java index eb705f1c8b7..bc20312e7b2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -53,6 +53,7 @@ import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; + @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. public class KStreamKStreamLeftJoinTest { private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0]; @@ -97,6 +98,7 @@ public class KStreamKStreamLeftJoinTest { false ); } + @Test public void testLeftJoinWithSpuriousResultFixDisabledOldApi() { runLeftJoinWithoutSpuriousResultFix( diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java index b4c08274b62..39ed039944a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java @@ -52,6 +52,7 @@ import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; + @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. public class KStreamKStreamOuterJoinTest { private final String topic1 = "topic1"; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java index 04bbda837d6..0344f46db8e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java @@ -59,6 +59,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertThrows; +@SuppressWarnings("deprecation") @RunWith(EasyMockRunner.class) public class KStreamRepartitionTest { private final String inputTopic = "input-topic"; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index 3022a360177..f4ebfdd74af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -69,6 +69,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; + @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. public class KStreamSessionWindowAggregateProcessorTest { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java index cf6efecc03a..798159d3532 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java @@ -711,7 +711,7 @@ public class KStreamSlidingWindowAggregateTest { builder .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) - .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100))) + .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(100))) .aggregate(MockInitializer.STRING_INIT, MockAggregator.toStringInstance("+"), Materialized.>as("topic1-Canonicalized").withValueSerde(Serdes.String())); props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion); @@ -743,7 +743,7 @@ public class KStreamSlidingWindowAggregateTest { final KStream stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String())); stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String())) - .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(90))) + .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(90))) .aggregate( MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, @@ -807,7 +807,7 @@ public class KStreamSlidingWindowAggregateTest { final KTable, String> table = builder .stream(topic1, Consumed.with(Serdes.String(), Serdes.String())) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) - .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(10000))) + .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(10000))) // The aggregator needs to sort the strings so the window value is the same for the final windows even when // records are processed in a different order. Here, we sort alphabetically. .aggregate( diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index 39a74447dac..b7759bbcdf4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -62,6 +62,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +@SuppressWarnings("deprecation") public class KStreamWindowAggregateTest { private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final String threadId = Thread.currentThread().getName(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java index 2946417a871..eee7cc57569 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java @@ -50,7 +50,7 @@ import org.apache.kafka.test.StreamsTestUtils; import org.junit.Before; import org.junit.Test; - +@SuppressWarnings("deprecation") public class SessionWindowedCogroupedKStreamImplTest { private final StreamsBuilder builder = new StreamsBuilder(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java index 96d301decad..52ff8580ad6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java @@ -54,6 +54,7 @@ import org.apache.kafka.test.StreamsTestUtils; import org.junit.Before; import org.junit.Test; +@SuppressWarnings("deprecation") public class SlidingWindowedCogroupedKStreamImplTest { private static final String TOPIC = "topic"; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java index e0b7957e018..7b521ab0619 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java @@ -70,6 +70,7 @@ import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +@SuppressWarnings("deprecation") public class SuppressScenarioTest { private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer(); private static final StringSerializer STRING_SERIALIZER = new StringSerializer(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java index d775c8986bf..d775796d9aa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java @@ -39,6 +39,7 @@ import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; +@SuppressWarnings("deprecation") public class SuppressTopologyTest { private static final Serde STRING_SERDE = Serdes.String(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java index f905d32838e..c558eab7473 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +@SuppressWarnings("deprecation") public class TimeWindowTest { private long start = 50; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java index d052429767b..cd9ca196ec9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java @@ -51,6 +51,7 @@ import org.apache.kafka.test.StreamsTestUtils; import org.junit.Before; import org.junit.Test; +@SuppressWarnings("deprecation") public class TimeWindowedCogroupedKStreamImplTest { private static final Long WINDOW_SIZE = 500L; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java index 8926a1e8a54..b912a96f3f8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java @@ -53,6 +53,7 @@ import java.util.regex.Pattern; import static java.time.Duration.ofMillis; import static org.junit.Assert.assertEquals; +@SuppressWarnings("deprecation") public class StreamsGraphTest { private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java index 3b7eb784f7c..d1eb5b52c6f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java @@ -66,6 +66,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +@SuppressWarnings("deprecation") public class RepartitionOptimizingTest { private final Logger log = LoggerFactory.getLogger(RepartitionOptimizingTest.class); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 860ed7383b0..7244bd6e881 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -137,6 +137,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @RunWith(value = Parameterized.class) +@SuppressWarnings("deprecation") public class StreamsPartitionAssignorTest { private static final String CONSUMER_1 = "consumer1"; private static final String CONSUMER_2 = "consumer2"; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index 2bd443781e2..86f7583789a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; +@SuppressWarnings("deprecation") public class SmokeTestClient extends SmokeTestUtil { private final String name; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java index da063c02d9f..714aa110ef3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java @@ -47,6 +47,7 @@ import java.util.regex.Pattern; import static java.time.Duration.ofMillis; +@SuppressWarnings("deprecation") public class StreamsOptimizedTest { public static void main(final String[] args) throws Exception { diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java index d9a2afe0f58..7c3af25ba85 100644 --- a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java @@ -58,6 +58,7 @@ public class GenericInMemoryKeyValueStore return this.name; } + @SuppressWarnings("deprecation") @Deprecated @Override /* This is a "dummy" store used for testing; diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java index 2198d181dd8..114ea067f9e 100644 --- a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java @@ -37,6 +37,7 @@ import java.util.TreeMap; * This class is a generic version of the in-memory key-value store that is useful for testing when you * need a basic KeyValueStore for arbitrary types and don't have/want to write a serde */ +@SuppressWarnings("deprecation") public class GenericInMemoryTimestampedKeyValueStore extends WrappedStateStore> implements TimestampedKeyValueStore { diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index d3f83e0e20a..d097b859035 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -54,6 +54,7 @@ import scala.jdk.CollectionConverters._ * @param inner The underlying Java abstraction for KStream * @see `org.apache.kafka.streams.kstream.KStream` */ +//noinspection ScalaDeprecation class KStream[K, V](val inner: KStreamJ[K, V]) { /** diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala index 9653ddb1e57..92ca5bd4a17 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala @@ -51,6 +51,7 @@ import scala.jdk.CollectionConverters._ /** * Test suite that verifies that the topology built by the Java and Scala APIs match. */ +//noinspection ScalaDeprecation class TopologyTest { private val inputTopic = "input-topic" @@ -377,14 +378,16 @@ class TopologyTest { mappedStream .filter((k: String, _: String) => k == "A") - .join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))( + .join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString, + JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)))( StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.intSerde) ) .to(JOINED_TOPIC) mappedStream .filter((k: String, _: String) => k == "A") - .join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))( + .join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString, + JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)))( StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.stringSerde) ) .to(JOINED_TOPIC) @@ -433,18 +436,22 @@ class TopologyTest { mappedStream .filter((key, _) => key == "A") - .join[Integer, String](stream2, - valueJoiner2, - JoinWindows.of(Duration.ofMillis(5000)), - StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.Integer)) + .join[Integer, String]( + stream2, + valueJoiner2, + JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)), + StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.Integer) + ) .to(JOINED_TOPIC) mappedStream .filter((key, _) => key == "A") - .join(stream3, - valueJoiner3, - JoinWindows.of(Duration.ofMillis(5000)), - StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.String)) + .join( + stream3, + valueJoiner3, + JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)), + StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.String) + ) .to(JOINED_TOPIC) builder diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala index 1d8a1f1bd02..0ec7b0e2f84 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala @@ -17,9 +17,7 @@ package org.apache.kafka.streams.scala.kstream import java.time.Duration.ofSeconds -import java.time.Instant -import java.util.regex.Pattern - +import java.time.{Duration, Instant} import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.kstream.{ JoinWindows, @@ -192,6 +190,7 @@ class KStreamTest extends TestDriver { testDriver.close() } + //noinspection ScalaDeprecation @Test def testJoinCorrectlyRecords(): Unit = { val builder = new StreamsBuilder() @@ -201,7 +200,9 @@ class KStreamTest extends TestDriver { val stream1 = builder.stream[String, String](sourceTopic1) val stream2 = builder.stream[String, String](sourceTopic2) - stream1.join(stream2)((a, b) => s"$a-$b", JoinWindows.of(ofSeconds(1))).to(sinkTopic) + stream1 + .join(stream2)((a, b) => s"$a-$b", JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(1), Duration.ofHours(24))) + .to(sinkTopic) val now = Instant.now() @@ -464,23 +465,4 @@ class KStreamTest extends TestDriver { val transformNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.toList(1) assertEquals("my-name", transformNode.name()) } - - @Test - def testSettingNameOnStream(): Unit = { - val builder = new StreamsBuilder() - val topicsPattern = "t-[A-Za-z0-9-].suffix" - val sinkTopic = "sink" - - builder - .stream[String, String](Pattern.compile(topicsPattern))( - Consumed.`with`[String, String].withName("my-fancy-name") - ) - .to(sinkTopic) - - import scala.jdk.CollectionConverters._ - - val streamNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.head - assertEquals("my-fancy-name", streamNode.name()) - } - } diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala index 15e090dc8f9..09a3a7d9087 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala @@ -36,6 +36,7 @@ import java.time.Duration.ofMillis import scala.jdk.CollectionConverters._ +//noinspection ScalaDeprecation class KTableTest extends TestDriver { @Test @@ -166,7 +167,7 @@ class KTableTest extends TestDriver { val builder = new StreamsBuilder() val sourceTopic = "source" val sinkTopic = "sink" - val window = TimeWindows.of(Duration.ofSeconds(1L)) + val window = TimeWindows.ofSizeAndGrace(Duration.ofSeconds(1L), Duration.ofHours(24)) val suppression = JSuppressed.untilTimeLimit[Windowed[String]](Duration.ofSeconds(2L), BufferConfig.unbounded()) val table: KTable[Windowed[String], Long] = builder @@ -224,7 +225,7 @@ class KTableTest extends TestDriver { val builder = new StreamsBuilder() val sourceTopic = "source" val sinkTopic = "sink" - val window = SlidingWindows.withTimeDifferenceAndGrace(ofMillis(1000L), ofMillis(1000L)) + val window = SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(1000L), ofMillis(1000L)) val suppression = JSuppressed.untilWindowCloses(BufferConfig.unbounded()) val table: KTable[Windowed[String], Long] = builder @@ -262,7 +263,7 @@ class KTableTest extends TestDriver { val builder = new StreamsBuilder() val sourceTopic = "source" val sinkTopic = "sink" - val window = TimeWindows.of(Duration.ofSeconds(1L)).grace(Duration.ofSeconds(1L)) + val window = TimeWindows.ofSizeAndGrace(Duration.ofSeconds(1L), Duration.ofSeconds(1L)) val suppression = JSuppressed.untilWindowCloses(BufferConfig.unbounded()) val table: KTable[Windowed[String], Long] = builder @@ -321,7 +322,7 @@ class KTableTest extends TestDriver { val sourceTopic = "source" val sinkTopic = "sink" // Very similar to SuppressScenarioTest.shouldSupportFinalResultsForSessionWindows - val window = SessionWindows.`with`(Duration.ofMillis(5L)).grace(Duration.ofMillis(10L)) + val window = SessionWindows.ofInactivityGapAndGrace(Duration.ofMillis(5L), Duration.ofMillis(10L)) val suppression = JSuppressed.untilWindowCloses(BufferConfig.unbounded()) val table: KTable[Windowed[String], Long] = builder