From d949f1094e2dc270ecbdd7b84aa8470690c697e4 Mon Sep 17 00:00:00 2001 From: ketulgupta1995 Date: Tue, 27 Apr 2021 00:08:48 +0530 Subject: [PATCH] KAFKA-12344 Support SlidingWindows in the Scala API (#10519) Support SlidingWindows in the Scala API Reviewers: Leah Thomas , Anna Sophie Blee-Goldman --- .../scala/kstream/CogroupedKStream.scala | 18 ++++++- .../scala/kstream/KGroupedStream.scala | 11 ++++ .../streams/scala/kstream/KTableTest.scala | 50 ++++++++++++++++++- 3 files changed, 76 insertions(+), 3 deletions(-) diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala index c23deb8ddeb..f4fe9fc5ca8 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala @@ -17,7 +17,13 @@ package org.apache.kafka.streams.scala package kstream -import org.apache.kafka.streams.kstream.{CogroupedKStream => CogroupedKStreamJ, SessionWindows, Window, Windows} +import org.apache.kafka.streams.kstream.{ + SessionWindows, + SlidingWindows, + Window, + Windows, + CogroupedKStream => CogroupedKStreamJ +} import org.apache.kafka.streams.scala.FunctionsCompatConversions.{AggregatorFromFunction, InitializerFromFunction} /** @@ -82,6 +88,16 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) { def windowedBy[W <: Window](windows: Windows[W]): TimeWindowedCogroupedKStream[KIn, VOut] = new TimeWindowedCogroupedKStream(inner.windowedBy(windows)) + /** + * Create a new [[TimeWindowedCogroupedKStream]] instance that can be used to perform sliding windowed aggregations. + * + * @param windows the specification of the aggregation `SlidingWindows` + * @return an instance of [[TimeWindowedCogroupedKStream]] + * @see `org.apache.kafka.streams.kstream.CogroupedKStream#windowedBy` + */ + def windowedBy(windows: SlidingWindows): TimeWindowedCogroupedKStream[KIn, VOut] = + new TimeWindowedCogroupedKStream(inner.windowedBy(windows)) + /** * Create a new [[SessionWindowedKStream]] instance that can be used to perform session windowed aggregations. * diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala index d3f1ea096b2..44a3e568d85 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala @@ -21,6 +21,7 @@ import org.apache.kafka.streams.kstream.internals.KTableImpl import org.apache.kafka.streams.scala.serialization.Serdes import org.apache.kafka.streams.kstream.{ SessionWindows, + SlidingWindows, Window, Windows, KGroupedStream => KGroupedStreamJ, @@ -155,6 +156,16 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) { def windowedBy[W <: Window](windows: Windows[W]): TimeWindowedKStream[K, V] = new TimeWindowedKStream(inner.windowedBy(windows)) + /** + * Create a new [[TimeWindowedKStream]] instance that can be used to perform sliding windowed aggregations. + * + * @param windows the specification of the aggregation `SlidingWindows` + * @return an instance of [[TimeWindowedKStream]] + * @see `org.apache.kafka.streams.kstream.KGroupedStream#windowedBy` + */ + def windowedBy(windows: SlidingWindows): TimeWindowedKStream[K, V] = + new TimeWindowedKStream(inner.windowedBy(windows)) + /** * Create a new [[SessionWindowedKStream]] instance that can be used to perform session windowed aggregations. * diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala index ccf955e8889..15e090dc8f9 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala @@ -17,15 +17,23 @@ package org.apache.kafka.streams.scala.kstream import org.apache.kafka.streams.kstream.Suppressed.BufferConfig -import org.apache.kafka.streams.kstream.{Named, SessionWindows, TimeWindows, Windowed, Suppressed => JSuppressed} +import org.apache.kafka.streams.kstream.{ + Named, + SlidingWindows, + SessionWindows, + TimeWindows, + Windowed, + Suppressed => JSuppressed +} import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.serialization.Serdes._ import org.apache.kafka.streams.scala.utils.TestDriver import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, StreamsBuilder} import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue} import org.junit.jupiter.api.Test - import java.time.Duration +import java.time.Duration.ofMillis + import scala.jdk.CollectionConverters._ class KTableTest extends TestDriver { @@ -211,6 +219,44 @@ class KTableTest extends TestDriver { testDriver.close() } + @Test + def testCorrectlyGroupByKeyWindowedBySlidingWindow(): Unit = { + val builder = new StreamsBuilder() + val sourceTopic = "source" + val sinkTopic = "sink" + val window = SlidingWindows.withTimeDifferenceAndGrace(ofMillis(1000L), ofMillis(1000L)) + val suppression = JSuppressed.untilWindowCloses(BufferConfig.unbounded()) + + val table: KTable[Windowed[String], Long] = builder + .stream[String, String](sourceTopic) + .groupByKey + .windowedBy(window) + .count() + .suppress(suppression) + + table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic) + + val testDriver = createTestDriver(builder) + val testInput = testDriver.createInput[String, String](sourceTopic) + val testOutput = testDriver.createOutput[String, Long](sinkTopic) + + { + // publish key=1 @ time 0 => count==1 + testInput.pipeInput("1", "value1", 0L) + assertTrue(testOutput.isEmpty) + } + { + // move event time right past the grace period of the first window. + testInput.pipeInput("2", "value3", 5001L) + val record = testOutput.readKeyValue + assertEquals("0:1000:1", record.key) + assertEquals(1L, record.value) + } + assertTrue(testOutput.isEmpty) + + testDriver.close() + } + @Test def testCorrectlySuppressResultsUsingSuppressedUntilWindowClosesByWindowed(): Unit = { val builder = new StreamsBuilder()