KAFKA-8613: New APIs for Controlling Grace Period for Windowed Operations (#10926)

Implements KIP-633.

Grace-period is an important parameter and its best to make it the user's responsibility to set it expliclity. Thus, we move off to provide a default and make it a mandatory parameter when creating a window.

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Luke Chen <showuon@gmail.com>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Israel Ekpo 2021-06-30 20:09:19 -04:00 committed by GitHub
parent a10e4e8547
commit b3905d9f71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 404 additions and 79 deletions

View File

@ -191,6 +191,8 @@ public class PageViewTypedDemo {
final KTable<String, UserProfile> users = builder.table("streams-userprofile-input", Consumed.with(Serdes.String(), new JSONSerde<>()));
final Duration duration24Hours = Duration.ofHours(24);
final KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
.leftJoin(users, (view, profile) -> {
final PageViewByRegion viewByRegion = new PageViewByRegion();
@ -206,7 +208,7 @@ public class PageViewTypedDemo {
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
.groupByKey(Grouped.with(Serdes.String(), new JSONSerde<>()))
.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(7), duration24Hours).advanceBy(Duration.ofSeconds(1)))
.count()
.toStream()
.map((key, value) -> {

View File

@ -78,6 +78,8 @@ public class PageViewUntypedDemo {
final KTable<String, String> userRegions = users.mapValues(record -> record.get("region").textValue());
final Duration duration24Hours = Duration.ofHours(24);
final KStream<JsonNode, JsonNode> regionCount = views
.leftJoin(userRegions, (view, region) -> {
final ObjectNode jNode = JsonNodeFactory.instance.objectNode();
@ -88,7 +90,7 @@ public class PageViewUntypedDemo {
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
.groupByKey(Grouped.with(Serdes.String(), jsonSerde))
.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(7), duration24Hours).advanceBy(Duration.ofSeconds(1)))
.count()
.toStream()
.map((key, value) -> {

View File

@ -79,6 +79,8 @@ public class TemperatureDemo {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
final Duration duration24Hours = Duration.ofHours(24);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> source = builder.stream("iot-temperature");
@ -88,7 +90,7 @@ public class TemperatureDemo {
// to group and reduce them, a key is needed ("temp" has been chosen)
.selectKey((key, value) -> "temp")
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(TEMPERATURE_WINDOW_SIZE)))
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(TEMPERATURE_WINDOW_SIZE), duration24Hours))
.reduce((value1, value2) -> {
if (Integer.parseInt(value1) > Integer.parseInt(value2)) {
return value1;

View File

@ -79,10 +79,7 @@ public class JoinWindows extends Windows<Window> {
protected final boolean enableSpuriousResultFix;
protected JoinWindows(final JoinWindows joinWindows) {
beforeMs = joinWindows.beforeMs;
afterMs = joinWindows.afterMs;
graceMs = joinWindows.graceMs;
enableSpuriousResultFix = joinWindows.enableSpuriousResultFix;
this(joinWindows.beforeMs, joinWindows.afterMs, joinWindows.graceMs, joinWindows.enableSpuriousResultFix);
}
private JoinWindows(final long beforeMs,
@ -92,32 +89,62 @@ public class JoinWindows extends Windows<Window> {
if (beforeMs + afterMs < 0) {
throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
}
if (graceMs < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
this.afterMs = afterMs;
this.beforeMs = beforeMs;
this.graceMs = graceMs;
this.enableSpuriousResultFix = enableSpuriousResultFix;
}
/**
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
* the timestamp of the record from the primary stream. Using the method explicitly sets the grace period to
* the duration specified by {@code afterWindowEnd} which means that out of order records arriving
* after the window end will be dropped. The delay is defined as (stream_time - record_timestamp).
*
* @param timeDifference join window interval
* @param afterWindowEnd The grace period to admit out-of-order events to a window.
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
* @return A new JoinWindows object with the specified window definition and grace period
*/
public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) {
return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
}
/**
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
* the timestamp of the record from the primary stream. Using the method implicitly sets the grace period to zero
* which means that out of order records arriving after the window end will be dropped.
*
* @param timeDifference join window interval
* @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
* @return a new JoinWindows object with the window definition and no grace period. Note that this means out of order records arriving after the window end will be dropped
*/
public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference) {
return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), 0L, true);
return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), NO_GRACE_PERIOD, true);
}
/**
/**
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
* the timestamp of the record from the primary stream.
*
* @param timeDifference join window interval
* @param timeDifference
* @return a new JoinWindows object with the window definition with and grace period (uses old default of 24 hours)
* @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead
*/
@Deprecated
public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefix);
return new JoinWindows(timeDifferenceMs, timeDifferenceMs, DEFAULT_GRACE_PERIOD_MS, false);
return new JoinWindows(timeDifferenceMs, timeDifferenceMs, DEPRECATED_OLD_24_HR_GRACE_PERIOD, false);
}
/**
@ -177,7 +204,9 @@ public class JoinWindows extends Windows<Window> {
* @param afterWindowEnd The grace period to admit out-of-order events to a window.
* @return this updated builder
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)} instead
*/
@Deprecated
public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix);

View File

@ -23,8 +23,9 @@ import java.util.Objects;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static org.apache.kafka.streams.kstream.Windows.DEFAULT_GRACE_PERIOD_MS;
import static org.apache.kafka.streams.kstream.Windows.DEPRECATED_OLD_24_HR_GRACE_PERIOD;
import static org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD;
import static java.time.Duration.ofMillis;
/**
* A session based window specification used for aggregating events into sessions.
@ -78,23 +79,68 @@ public final class SessionWindows {
private SessionWindows(final long gapMs, final long graceMs) {
this.gapMs = gapMs;
this.graceMs = graceMs;
if (gapMs <= 0) {
throw new IllegalArgumentException("Gap time cannot be zero or negative.");
}
if (graceMs < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
}
/**
* Creates a new window specification with the specified inactivity gap.
* Using the method implicitly sets the grace period to zero which
* means that out of order records arriving after the window end will be dropped
*
* <p>
* Note that new events may change the boundaries of session windows, so aggressive
* close times can lead to surprising results in which an out-of-order event is rejected and then
* a subsequent event moves the window boundary forward.
*
* @param inactivityGap the gap of inactivity between sessions
* @return a window definition with the window size and no grace period. Note that this means out of order records arriving after the window end will be dropped
* @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't be represented as {@code long milliseconds}
*/
public static SessionWindows ofInactivityGapWithNoGrace(final Duration inactivityGap) {
return ofInactivityGapAndGrace(inactivityGap, ofMillis(NO_GRACE_PERIOD));
}
/**
* Creates a new window specification with the specified inactivity gap.
* Using the method explicitly sets the grace period to the duration specified by {@code afterWindowEnd} which
* means that out of order records arriving after the window end will be dropped
*
* <p>
* Note that new events may change the boundaries of session windows, so aggressive
* close times can lead to surprising results in which an out-of-order event is rejected and then
* a subsequent event moves the window boundary forward.
*
* @param inactivityGap the gap of inactivity between sessions
* @param afterWindowEnd The grace period to admit out-of-order events to a window.
* @return A SessionWindows object with the specified inactivity gap and grace period
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
*/
public static SessionWindows ofInactivityGapAndGrace(final Duration inactivityGap, final Duration afterWindowEnd) {
return new SessionWindows(inactivityGap.toMillis(), afterWindowEnd.toMillis());
}
/**
* Create a new window specification with the specified inactivity gap.
*
* @param inactivityGap the gap of inactivity between sessions
* @return a new window specification with default maintain duration of 1 day
*
* @return a new window specification without specifying a grace period (uses old default of 24 hours)
* @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofInactivityGapWithNoGrace(Duration)} instead
*/
@Deprecated
public static SessionWindows with(final Duration inactivityGap) {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
final long inactivityGapMs = validateMillisecondDuration(inactivityGap, msgPrefix);
if (inactivityGapMs <= 0) {
throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative.");
}
return new SessionWindows(inactivityGapMs, DEFAULT_GRACE_PERIOD_MS);
return new SessionWindows(inactivityGapMs, DEPRECATED_OLD_24_HR_GRACE_PERIOD);
}
/**
@ -108,13 +154,12 @@ public final class SessionWindows {
* @param afterWindowEnd The grace period to admit out-of-order events to a window.
* @return this updated builder
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofInactivityGapAndGrace(Duration, Duration)} instead
*/
@Deprecated
public SessionWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
return new SessionWindows(gapMs, afterWindowEndMs);
}

View File

@ -17,10 +17,14 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.processor.TimestampExtractor;
import java.time.Duration;
import java.util.Objects;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD;
/**
* A sliding window used for aggregating events.
@ -78,6 +82,45 @@ public final class SlidingWindows {
private SlidingWindows(final long timeDifferenceMs, final long graceMs) {
this.timeDifferenceMs = timeDifferenceMs;
this.graceMs = graceMs;
if (timeDifferenceMs < 0) {
throw new IllegalArgumentException("Window time difference must not be negative.");
}
if (graceMs < 0) {
throw new IllegalArgumentException("Window grace period must not be negative.");
}
}
/**
* Return a window definition with the window size
* Using the method implicitly sets the grace period to zero which means that
* out of order records arriving after the window end will be dropped
*
* @param timeDifference the max time difference (inclusive) between two records in a window
* @return a new window definition with no grace period. Note that this means out of order records arriving after the window end will be dropped
* @throws IllegalArgumentException if the timeDifference is negative
*/
public static SlidingWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference) throws IllegalArgumentException {
return ofTimeDifferenceAndGrace(timeDifference, ofMillis(NO_GRACE_PERIOD));
}
/**
* Return a window definition with the window size based on the given maximum time difference (inclusive) between
* records in the same window and given window grace period. Reject out-of-order events that arrive after {@code afterWindowEnd}.
* A window is closed when {@code stream-time > window-end + grace-period}.
*
* @param timeDifference the max time difference (inclusive) between two records in a window
* @param afterWindowEnd the grace period to admit out-of-order events to a window
* @return a new window definition with the specified grace period
* @throws IllegalArgumentException if the timeDifference or afterWindowEnd (grace period) is negative
*/
public static SlidingWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) throws IllegalArgumentException {
final long timeDifferenceMs = timeDifference.toMillis();
final long afterWindowEndMs = afterWindowEnd.toMillis();
return new SlidingWindows(timeDifferenceMs, afterWindowEndMs);
}
/**
@ -89,18 +132,16 @@ public final class SlidingWindows {
* @param grace the grace period to admit out-of-order events to a window
* @return a new window definition
* @throws IllegalArgumentException if the specified window size is &lt; 0 or grace &lt; 0, or either can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)} or {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead
*/
@Deprecated
public static SlidingWindows withTimeDifferenceAndGrace(final Duration timeDifference, final Duration grace) throws IllegalArgumentException {
final String msgPrefixSize = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefixSize);
if (timeDifferenceMs < 0) {
throw new IllegalArgumentException("Window time difference must not be negative.");
}
final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace, "grace");
final long graceMs = validateMillisecondDuration(grace, msgPrefixGrace);
if (graceMs < 0) {
throw new IllegalArgumentException("Window grace period must not be negative.");
}
return new SlidingWindows(timeDifferenceMs, graceMs);
}

View File

@ -24,6 +24,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
@ -72,6 +73,59 @@ public final class TimeWindows extends Windows<TimeWindow> {
this.sizeMs = sizeMs;
this.advanceMs = advanceMs;
this.graceMs = graceMs;
if (sizeMs <= 0) {
throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero.");
}
if (graceMs < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
}
/**
* Return a window definition with the given window size, and with the advance interval being equal to the window
* size.
* The time interval represented by the N-th window is: {@code [N * size, N * size + size)}.
* <p>
* This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
* Tumbling windows are a special case of hopping windows with {@code advance == size}.
* Using the method implicitly sets the grace period to zero which means
* that out of order records arriving after the window end will be dropped
*
* @param size The size of the window
* @return a new window definition with default no grace period. Note that this means out of order records arriving after the window end will be dropped
* @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
*/
public static TimeWindows ofSizeWithNoGrace(final Duration size) throws IllegalArgumentException {
return ofSizeAndGrace(size, ofMillis(NO_GRACE_PERIOD));
}
/**
* Return a window definition with the given window size, and with the advance interval being equal to the window
* size.
* The time interval represented by the N-th window is: {@code [N * size, N * size + size)}.
* <p>
* This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
* Tumbling windows are a special case of hopping windows with {@code advance == size}.
* Using the method explicitly sets the grace period to the duration specified by {@code afterWindowEnd} which means
* that out of order records arriving after the window end will be dropped.
*
* <p>
* Delay is defined as (stream_time - record_timestamp).
*
* @param size The size of the window. Must be larger than zero
* @param afterWindowEnd The grace period to admit out-of-order events to a window. Must be non-negative.
* @return a TimeWindows object with the specified size and the specified grace period
* @throws IllegalArgumentException if {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
*/
public static TimeWindows ofSizeAndGrace(final Duration size, final Duration afterWindowEnd)
throws IllegalArgumentException {
final long sizeMs = size.toMillis();
final long afterWindowEndMs = afterWindowEnd.toMillis();
return new TimeWindows(sizeMs, sizeMs, afterWindowEndMs);
}
/**
@ -83,16 +137,16 @@ public final class TimeWindows extends Windows<TimeWindow> {
* Tumbling windows are a special case of hopping windows with {@code advance == size}.
*
* @param size The size of the window
* @return a new window definition with default maintain duration of 1 day
* @return a new window definition without specifying the grace period (uses old default of 24 hours)
* @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofSizeWithNoGrace(Duration)} } instead
*/
@Deprecated
public static TimeWindows of(final Duration size) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
final long sizeMs = validateMillisecondDuration(size, msgPrefix);
if (sizeMs <= 0) {
throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero.");
}
return new TimeWindows(sizeMs, sizeMs, DEFAULT_GRACE_PERIOD_MS);
return new TimeWindows(sizeMs, sizeMs, DEPRECATED_OLD_24_HR_GRACE_PERIOD);
}
/**
@ -142,7 +196,9 @@ public final class TimeWindows extends Windows<TimeWindow> {
* @param afterWindowEnd The grace period to admit out-of-order events to a window.
* @return this updated builder
* @throws IllegalArgumentException if {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
* @deprecated since 3.0 Use {@link #ofSizeAndGrace(Duration, Duration)} instead
*/
@Deprecated
public TimeWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix);

View File

@ -38,9 +38,17 @@ import java.util.Map;
*/
public abstract class Windows<W extends Window> {
// By default grace period is 24 hours for all windows,
// in other words we allow out-of-order data for up to a day
protected static final long DEFAULT_GRACE_PERIOD_MS = 24 * 60 * 60 * 1000L;
/**
* By default grace period is 24 hours for all windows in other words we allow out-of-order data for up to a day
* This behavior is now deprecated and additional details are available in the motivation for the KIP
* Check out <a href="https://cwiki.apache.org/confluence/x/Ho2NCg">KIP-633</a> for more details
*/
protected static final long DEPRECATED_OLD_24_HR_GRACE_PERIOD = 24 * 60 * 60 * 1000L;
/**
* This constant is used as the specified grace period where we do not have any grace periods instead of magic constants
*/
protected static final long NO_GRACE_PERIOD = 0L;
protected Windows() {}

View File

@ -62,6 +62,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
@SuppressWarnings("deprecation")
public class TopologyTest {
private final StoreBuilder<MockKeyValueStore> storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);

View File

@ -341,6 +341,7 @@ public abstract class AbstractResetIntegrationTest {
}
}
@SuppressWarnings("deprecation")
private Topology setupTopologyWithIntermediateTopic(final boolean useRepartitioned,
final String outputTopic2) {
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -71,6 +71,7 @@ import static org.junit.Assert.assertTrue;
/**
* Tests related to internal topics in streams
*/
@SuppressWarnings("deprecation")
@Category({IntegrationTest.class})
public class InternalTopicIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);

View File

@ -49,6 +49,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThrows;
@SuppressWarnings("deprecation")
@Category({IntegrationTest.class})
public class JoinStoreIntegrationTest {

View File

@ -65,6 +65,7 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
* by virtue of having a large commit interval
*/
@Category({IntegrationTest.class})
@SuppressWarnings("deprecation")
public class KStreamAggregationDedupIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final long COMMIT_INTERVAL_MS = 300L;

View File

@ -98,7 +98,7 @@ import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "deprecation"})
@Category({IntegrationTest.class})
public class KStreamAggregationIntegrationTest {
private static final int NUM_BROKERS = 1;
@ -209,6 +209,7 @@ public class KStreamAggregationIntegrationTest {
return keyComparison;
}
@SuppressWarnings("deprecation")
@Test
public void shouldReduceWindowed() throws Exception {
final long firstBatchTimestamp = mockTime.milliseconds();
@ -219,6 +220,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondBatchTimestamp);
final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
//noinspection deprecation
groupedStream
.windowedBy(TimeWindows.of(ofMillis(500L)))
.reduce(reducer)
@ -318,6 +320,7 @@ public class KStreamAggregationIntegrationTest {
)));
}
@SuppressWarnings("deprecation")
@Test
public void shouldAggregateWindowed() throws Exception {
final long firstTimestamp = mockTime.milliseconds();
@ -328,6 +331,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondTimestamp);
final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
//noinspection deprecation
groupedStream.windowedBy(TimeWindows.of(ofMillis(500L)))
.aggregate(
initializer,
@ -442,12 +446,14 @@ public class KStreamAggregationIntegrationTest {
shouldCountHelper();
}
@SuppressWarnings("deprecation")
@Test
public void shouldGroupByKey() throws Exception {
final long timestamp = mockTime.milliseconds();
produceMessages(timestamp);
produceMessages(timestamp);
//noinspection deprecation
stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(500L)))
.count()
@ -476,6 +482,7 @@ public class KStreamAggregationIntegrationTest {
)));
}
@SuppressWarnings("deprecation")
@Test
public void shouldReduceSlidingWindows() throws Exception {
final long firstBatchTimestamp = mockTime.milliseconds();
@ -487,6 +494,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(thirdBatchTimestamp);
final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference);
//noinspection deprecation
groupedStream
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(2000L)))
.reduce(reducer)
@ -580,6 +588,7 @@ public class KStreamAggregationIntegrationTest {
}
}
@SuppressWarnings("deprecation")
@Test
public void shouldAggregateSlidingWindows() throws Exception {
final long firstBatchTimestamp = mockTime.milliseconds();
@ -591,6 +600,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(thirdBatchTimestamp);
final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference);
//noinspection deprecation
groupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L), ofMinutes(5)))
.aggregate(
initializer,
@ -689,6 +699,7 @@ public class KStreamAggregationIntegrationTest {
}
@SuppressWarnings("deprecation")
@Test
public void shouldCountSessionWindows() throws Exception {
final long sessionGap = 5 * 60 * 1000L;
@ -761,6 +772,7 @@ public class KStreamAggregationIntegrationTest {
final Map<Windowed<String>, KeyValue<Long, Long>> results = new HashMap<>();
final CountDownLatch latch = new CountDownLatch(13);
//noinspection deprecation
builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.with(ofMillis(sessionGap)))
@ -797,6 +809,7 @@ public class KStreamAggregationIntegrationTest {
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(KeyValue.pair(1L, t3)));
}
@SuppressWarnings("deprecation")
@Test
public void shouldReduceSessionWindows() throws Exception {
final long sessionGap = 1000L; // something to do with time
@ -869,6 +882,7 @@ public class KStreamAggregationIntegrationTest {
final Map<Windowed<String>, KeyValue<String, Long>> results = new HashMap<>();
final CountDownLatch latch = new CountDownLatch(13);
final String userSessionsStore = "UserSessionsStore";
//noinspection deprecation
builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.with(ofMillis(sessionGap)))

View File

@ -81,6 +81,7 @@ import static org.junit.Assert.assertTrue;
@RunWith(value = Parameterized.class)
@Category({IntegrationTest.class})
@SuppressWarnings("deprecation")
public class KStreamRepartitionIntegrationTest {
private static final int NUM_BROKERS = 1;

View File

@ -66,6 +66,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@Category({IntegrationTest.class})
@SuppressWarnings("deprecation")
public class MetricsIntegrationTest {
private static final int NUM_BROKERS = 1;

View File

@ -120,6 +120,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
@Category({IntegrationTest.class})
@SuppressWarnings("deprecation")
public class QueryableStateIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(QueryableStateIntegrationTest.class);

View File

@ -69,6 +69,7 @@ import static org.hamcrest.Matchers.notNullValue;
@Category({IntegrationTest.class})
@RunWith(Parameterized.class)
@SuppressWarnings("deprecation")
public class RocksDBMetricsIntegrationTest {
private static final int NUM_BROKERS = 3;

View File

@ -28,11 +28,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
@SuppressWarnings("deprecation")
public class JoinWindowsTest {
private static final long ANY_SIZE = 123L;
private static final long ANY_OTHER_SIZE = 456L; // should be larger than anySize
private static final long ANY_GRACE = 1024L;
@SuppressWarnings("deprecation")
@Test
public void validWindows() {
JoinWindows.of(ofMillis(ANY_OTHER_SIZE)) // [ -anyOtherSize ; anyOtherSize ]
@ -69,6 +72,8 @@ public class JoinWindowsTest {
@Test
public void timeDifferenceMustNotBeNegative() {
assertThrows(IllegalArgumentException.class, () -> JoinWindows.of(ofMillis(-1)));
assertThrows(IllegalArgumentException.class, () -> JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(-1)));
assertThrows(IllegalArgumentException.class, () -> JoinWindows.ofTimeDifferenceAndGrace(ofMillis(-1), ofMillis(ANY_GRACE)));
}
@Test
@ -133,6 +138,16 @@ public class JoinWindowsTest {
JoinWindows.of(ofMillis(9)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60)),
JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60))
);
verifyEquality(
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3)),
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3))
);
verifyEquality(
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(4)),
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(4))
);
}
@Test
@ -162,5 +177,15 @@ public class JoinWindowsTest {
JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(9)),
JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3))
);
verifyInEquality(
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(9)),
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(3))
);
verifyInEquality(
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(9), ofMillis(9)),
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(3), ofMillis(9))
);
}
}

View File

@ -38,6 +38,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@SuppressWarnings("deprecation")
public class RepartitionTopicNamingTest {
private final KeyValueMapper<String, String, String> kvMapper = (k, v) -> k + v;

View File

@ -25,12 +25,17 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
@SuppressWarnings("deprecation")
public class SessionWindowsTest {
@Test
public void shouldSetWindowGap() {
final long anyGap = 42L;
final long anyGrace = 1024L;
assertEquals(anyGap, SessionWindows.with(ofMillis(anyGap)).inactivityGap());
assertEquals(anyGap, SessionWindows.ofInactivityGapWithNoGrace(ofMillis(anyGap)).inactivityGap());
assertEquals(anyGap, SessionWindows.ofInactivityGapAndGrace(ofMillis(anyGap), ofMillis(anyGrace)).inactivityGap());
}
@Test
@ -66,6 +71,15 @@ public class SessionWindowsTest {
public void equalsAndHashcodeShouldBeValidForPositiveCases() {
verifyEquality(SessionWindows.with(ofMillis(1)), SessionWindows.with(ofMillis(1)));
verifyEquality(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)),
SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1))
);
verifyEquality(
SessionWindows.ofInactivityGapAndGrace(ofMillis(1), ofMillis(11)),
SessionWindows.ofInactivityGapAndGrace(ofMillis(1), ofMillis(11))
);
verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(6)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)));
verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(7)), SessionWindows.with(ofMillis(1)).grace(ofMillis(7)));
@ -75,6 +89,15 @@ public class SessionWindowsTest {
@Test
public void equalsAndHashcodeShouldBeValidForNegativeCases() {
verifyInEquality(
SessionWindows.ofInactivityGapWithNoGrace(ofMillis(9)),
SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)));
verifyInEquality(
SessionWindows.ofInactivityGapAndGrace(ofMillis(9), ofMillis(9)),
SessionWindows.ofInactivityGapAndGrace(ofMillis(1), ofMillis(9)));
verifyInEquality(SessionWindows.with(ofMillis(9)), SessionWindows.with(ofMillis(1)));
verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(9)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)));

View File

@ -24,13 +24,17 @@ import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
@SuppressWarnings("deprecation")
public class SlidingWindowsTest {
private static final long ANY_SIZE = 123L;
private static final long ANY_GRACE = 1024L;
@Test
public void shouldSetTimeDifference() {
assertEquals(ANY_SIZE, SlidingWindows.withTimeDifferenceAndGrace(ofMillis(ANY_SIZE), ofMillis(3)).timeDifferenceMs());
assertEquals(ANY_SIZE, SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(ANY_SIZE), ofMillis(ANY_GRACE)).timeDifferenceMs());
assertEquals(ANY_SIZE, SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(ANY_SIZE)).timeDifferenceMs());
}
@Test
@ -56,6 +60,16 @@ public class SlidingWindowsTest {
SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(grace)),
SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(grace))
);
verifyEquality(
SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(grace)),
SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(grace))
);
verifyEquality(
SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(timeDifference)),
SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(timeDifference))
);
}
@Test

View File

@ -29,13 +29,17 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
@SuppressWarnings("deprecation")
public class TimeWindowsTest {
private static final long ANY_SIZE = 123L;
private static final long ANY_GRACE = 1024L;
@Test
public void shouldSetWindowSize() {
assertEquals(ANY_SIZE, TimeWindows.of(ofMillis(ANY_SIZE)).sizeMs);
assertEquals(ANY_SIZE, TimeWindows.ofSizeWithNoGrace(ofMillis(ANY_SIZE)).sizeMs);
assertEquals(ANY_SIZE, TimeWindows.ofSizeAndGrace(ofMillis(ANY_SIZE), ofMillis(ANY_GRACE)).sizeMs);
}
@Test
@ -140,10 +144,27 @@ public class TimeWindowsTest {
TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).grace(ofMillis(4)),
TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).grace(ofMillis(4))
);
verifyEquality(TimeWindows.ofSizeWithNoGrace(ofMillis(3)), TimeWindows.ofSizeWithNoGrace(ofMillis(3)));
verifyEquality(TimeWindows.ofSizeAndGrace(ofMillis(3), ofMillis(33)),
TimeWindows.ofSizeAndGrace(ofMillis(3), ofMillis(33))
);
}
@Test
public void equalsAndHashcodeShouldBeValidForNegativeCases() {
verifyInEquality(
TimeWindows.ofSizeWithNoGrace(ofMillis(9)),
TimeWindows.ofSizeWithNoGrace(ofMillis(3))
);
verifyInEquality(
TimeWindows.ofSizeAndGrace(ofMillis(9), ofMillis(9)),
TimeWindows.ofSizeAndGrace(ofMillis(3), ofMillis(9))
);
verifyInEquality(TimeWindows.of(ofMillis(9)), TimeWindows.of(ofMillis(3)));
verifyInEquality(TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)), TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)));

View File

@ -61,7 +61,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KGroupedStreamImplTest {

View File

@ -100,6 +100,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamImplTest {
@ -704,6 +705,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("materialized can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullOtherStreamOnJoin() {
final NullPointerException exception = assertThrows(
@ -712,6 +714,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("otherStream can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullOtherStreamOnJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@ -724,6 +727,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("otherStream can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerOnJoin() {
final NullPointerException exception = assertThrows(
@ -732,6 +736,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerWithKeyOnJoin() {
final NullPointerException exception = assertThrows(
@ -740,6 +745,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerOnJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@ -752,6 +758,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerWithKeyOnJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@ -784,6 +791,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("windows can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullStreamJoinedOnJoin() {
final NullPointerException exception = assertThrows(
@ -796,6 +804,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("streamJoined can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullOtherStreamOnLeftJoin() {
final NullPointerException exception = assertThrows(
@ -804,6 +813,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("otherStream can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullOtherStreamOnLeftJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@ -816,6 +826,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("otherStream can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerOnLeftJoin() {
final NullPointerException exception = assertThrows(
@ -824,6 +835,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerWithKeyOnLeftJoin() {
final NullPointerException exception = assertThrows(
@ -832,6 +844,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerOnLeftJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@ -844,6 +857,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerWithKeyOnLeftJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@ -877,6 +891,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("windows can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullStreamJoinedOnLeftJoin() {
final NullPointerException exception = assertThrows(
@ -889,6 +904,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("streamJoined can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullOtherStreamOnOuterJoin() {
final NullPointerException exception = assertThrows(
@ -897,6 +913,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("otherStream can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullOtherStreamOnOuterJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@ -909,6 +926,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("otherStream can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerOnOuterJoin() {
final NullPointerException exception = assertThrows(
@ -917,6 +935,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerWithKeyOnOuterJoin() {
final NullPointerException exception = assertThrows(
@ -925,6 +944,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerOnOuterJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@ -937,6 +957,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullValueJoinerWithKeyOnOuterJoinWithStreamJoined() {
final NullPointerException exception = assertThrows(
@ -969,6 +990,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("windows can't be null"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldNotAllowNullStreamJoinedOnOuterJoin() {
final NullPointerException exception = assertThrows(
@ -1511,6 +1533,7 @@ public class KStreamImplTest {
assertThat(mockProcessors.get(1).processed(), equalTo(Collections.singletonList(new KeyValueTimestamp<>("b", "v1", 0))));
}
@SuppressWarnings("deprecation")
@Test
public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreatedWithRetention() {
final StreamsBuilder builder = new StreamsBuilder();
@ -1538,6 +1561,7 @@ public class KStreamImplTest {
}
}
@SuppressWarnings("deprecation")
@Test
public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() {
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -63,6 +63,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamKStreamJoinTest {
private final String topic1 = "topic1";

View File

@ -53,6 +53,7 @@ import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamKStreamLeftJoinTest {
private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
@ -97,6 +98,7 @@ public class KStreamKStreamLeftJoinTest {
false
);
}
@Test
public void testLeftJoinWithSpuriousResultFixDisabledOldApi() {
runLeftJoinWithoutSpuriousResultFix(

View File

@ -52,6 +52,7 @@ import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamKStreamOuterJoinTest {
private final String topic1 = "topic1";

View File

@ -59,6 +59,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThrows;
@SuppressWarnings("deprecation")
@RunWith(EasyMockRunner.class)
public class KStreamRepartitionTest {
private final String inputTopic = "input-topic";

View File

@ -69,6 +69,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamSessionWindowAggregateProcessorTest {

View File

@ -711,7 +711,7 @@ public class KStreamSlidingWindowAggregateTest {
builder
.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(100)))
.aggregate(MockInitializer.STRING_INIT, MockAggregator.toStringInstance("+"), Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()));
props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
@ -743,7 +743,7 @@ public class KStreamSlidingWindowAggregateTest {
final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(90)))
.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(90)))
.aggregate(
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
@ -807,7 +807,7 @@ public class KStreamSlidingWindowAggregateTest {
final KTable<Windowed<String>, String> table = builder
.stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(10000)))
.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(10000)))
// The aggregator needs to sort the strings so the window value is the same for the final windows even when
// records are processed in a different order. Here, we sort alphabetically.
.aggregate(

View File

@ -62,6 +62,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation")
public class KStreamWindowAggregateTest {
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private final String threadId = Thread.currentThread().getName();

View File

@ -50,7 +50,7 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
import org.junit.Test;
@SuppressWarnings("deprecation")
public class SessionWindowedCogroupedKStreamImplTest {
private final StreamsBuilder builder = new StreamsBuilder();

View File

@ -54,6 +54,7 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
import org.junit.Test;
@SuppressWarnings("deprecation")
public class SlidingWindowedCogroupedKStreamImplTest {
private static final String TOPIC = "topic";

View File

@ -70,6 +70,7 @@ import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@SuppressWarnings("deprecation")
public class SuppressScenarioTest {
private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
private static final StringSerializer STRING_SERIALIZER = new StringSerializer();

View File

@ -39,6 +39,7 @@ import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@SuppressWarnings("deprecation")
public class SuppressTopologyTest {
private static final Serde<String> STRING_SERDE = Serdes.String();

View File

@ -27,6 +27,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation")
public class TimeWindowTest {
private long start = 50;

View File

@ -51,6 +51,7 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
import org.junit.Test;
@SuppressWarnings("deprecation")
public class TimeWindowedCogroupedKStreamImplTest {
private static final Long WINDOW_SIZE = 500L;

View File

@ -53,6 +53,7 @@ import java.util.regex.Pattern;
import static java.time.Duration.ofMillis;
import static org.junit.Assert.assertEquals;
@SuppressWarnings("deprecation")
public class StreamsGraphTest {
private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");

View File

@ -66,6 +66,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@SuppressWarnings("deprecation")
public class RepartitionOptimizingTest {
private final Logger log = LoggerFactory.getLogger(RepartitionOptimizingTest.class);

View File

@ -137,6 +137,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@RunWith(value = Parameterized.class)
@SuppressWarnings("deprecation")
public class StreamsPartitionAssignorTest {
private static final String CONSUMER_1 = "consumer1";
private static final String CONSUMER_2 = "consumer2";

View File

@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
@SuppressWarnings("deprecation")
public class SmokeTestClient extends SmokeTestUtil {
private final String name;

View File

@ -47,6 +47,7 @@ import java.util.regex.Pattern;
import static java.time.Duration.ofMillis;
@SuppressWarnings("deprecation")
public class StreamsOptimizedTest {
public static void main(final String[] args) throws Exception {

View File

@ -58,6 +58,7 @@ public class GenericInMemoryKeyValueStore<K extends Comparable, V>
return this.name;
}
@SuppressWarnings("deprecation")
@Deprecated
@Override
/* This is a "dummy" store used for testing;

View File

@ -37,6 +37,7 @@ import java.util.TreeMap;
* This class is a generic version of the in-memory key-value store that is useful for testing when you
* need a basic KeyValueStore for arbitrary types and don't have/want to write a serde
*/
@SuppressWarnings("deprecation")
public class GenericInMemoryTimestampedKeyValueStore<K extends Comparable, V>
extends WrappedStateStore<StateStore, K, ValueAndTimestamp<V>>
implements TimestampedKeyValueStore<K, V> {

View File

@ -54,6 +54,7 @@ import scala.jdk.CollectionConverters._
* @param inner The underlying Java abstraction for KStream
* @see `org.apache.kafka.streams.kstream.KStream`
*/
//noinspection ScalaDeprecation
class KStream[K, V](val inner: KStreamJ[K, V]) {
/**

View File

@ -51,6 +51,7 @@ import scala.jdk.CollectionConverters._
/**
* Test suite that verifies that the topology built by the Java and Scala APIs match.
*/
//noinspection ScalaDeprecation
class TopologyTest {
private val inputTopic = "input-topic"
@ -377,14 +378,16 @@ class TopologyTest {
mappedStream
.filter((k: String, _: String) => k == "A")
.join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))(
.join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString,
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)))(
StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.intSerde)
)
.to(JOINED_TOPIC)
mappedStream
.filter((k: String, _: String) => k == "A")
.join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))(
.join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString,
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)))(
StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.stringSerde)
)
.to(JOINED_TOPIC)
@ -433,18 +436,22 @@ class TopologyTest {
mappedStream
.filter((key, _) => key == "A")
.join[Integer, String](stream2,
valueJoiner2,
JoinWindows.of(Duration.ofMillis(5000)),
StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.Integer))
.join[Integer, String](
stream2,
valueJoiner2,
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)),
StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.Integer)
)
.to(JOINED_TOPIC)
mappedStream
.filter((key, _) => key == "A")
.join(stream3,
valueJoiner3,
JoinWindows.of(Duration.ofMillis(5000)),
StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.String))
.join(
stream3,
valueJoiner3,
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)),
StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.String)
)
.to(JOINED_TOPIC)
builder

View File

@ -17,9 +17,7 @@
package org.apache.kafka.streams.scala.kstream
import java.time.Duration.ofSeconds
import java.time.Instant
import java.util.regex.Pattern
import java.time.{Duration, Instant}
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{
JoinWindows,
@ -192,6 +190,7 @@ class KStreamTest extends TestDriver {
testDriver.close()
}
//noinspection ScalaDeprecation
@Test
def testJoinCorrectlyRecords(): Unit = {
val builder = new StreamsBuilder()
@ -201,7 +200,9 @@ class KStreamTest extends TestDriver {
val stream1 = builder.stream[String, String](sourceTopic1)
val stream2 = builder.stream[String, String](sourceTopic2)
stream1.join(stream2)((a, b) => s"$a-$b", JoinWindows.of(ofSeconds(1))).to(sinkTopic)
stream1
.join(stream2)((a, b) => s"$a-$b", JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(1), Duration.ofHours(24)))
.to(sinkTopic)
val now = Instant.now()
@ -464,23 +465,4 @@ class KStreamTest extends TestDriver {
val transformNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.toList(1)
assertEquals("my-name", transformNode.name())
}
@Test
def testSettingNameOnStream(): Unit = {
val builder = new StreamsBuilder()
val topicsPattern = "t-[A-Za-z0-9-].suffix"
val sinkTopic = "sink"
builder
.stream[String, String](Pattern.compile(topicsPattern))(
Consumed.`with`[String, String].withName("my-fancy-name")
)
.to(sinkTopic)
import scala.jdk.CollectionConverters._
val streamNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.head
assertEquals("my-fancy-name", streamNode.name())
}
}

View File

@ -36,6 +36,7 @@ import java.time.Duration.ofMillis
import scala.jdk.CollectionConverters._
//noinspection ScalaDeprecation
class KTableTest extends TestDriver {
@Test
@ -166,7 +167,7 @@ class KTableTest extends TestDriver {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
val window = TimeWindows.of(Duration.ofSeconds(1L))
val window = TimeWindows.ofSizeAndGrace(Duration.ofSeconds(1L), Duration.ofHours(24))
val suppression = JSuppressed.untilTimeLimit[Windowed[String]](Duration.ofSeconds(2L), BufferConfig.unbounded())
val table: KTable[Windowed[String], Long] = builder
@ -224,7 +225,7 @@ class KTableTest extends TestDriver {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
val window = SlidingWindows.withTimeDifferenceAndGrace(ofMillis(1000L), ofMillis(1000L))
val window = SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(1000L), ofMillis(1000L))
val suppression = JSuppressed.untilWindowCloses(BufferConfig.unbounded())
val table: KTable[Windowed[String], Long] = builder
@ -262,7 +263,7 @@ class KTableTest extends TestDriver {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
val window = TimeWindows.of(Duration.ofSeconds(1L)).grace(Duration.ofSeconds(1L))
val window = TimeWindows.ofSizeAndGrace(Duration.ofSeconds(1L), Duration.ofSeconds(1L))
val suppression = JSuppressed.untilWindowCloses(BufferConfig.unbounded())
val table: KTable[Windowed[String], Long] = builder
@ -321,7 +322,7 @@ class KTableTest extends TestDriver {
val sourceTopic = "source"
val sinkTopic = "sink"
// Very similar to SuppressScenarioTest.shouldSupportFinalResultsForSessionWindows
val window = SessionWindows.`with`(Duration.ofMillis(5L)).grace(Duration.ofMillis(10L))
val window = SessionWindows.ofInactivityGapAndGrace(Duration.ofMillis(5L), Duration.ofMillis(10L))
val suppression = JSuppressed.untilWindowCloses(BufferConfig.unbounded())
val table: KTable[Windowed[String], Long] = builder