KAFKA-12344 Support SlidingWindows in the Scala API (#10519)

Support SlidingWindows in the Scala API

Reviewers: Leah Thomas <lthomas@confluent.io>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
ketulgupta1995 2021-04-27 00:08:48 +05:30 committed by GitHub
parent 031b7208b3
commit d949f1094e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 76 additions and 3 deletions

View File

@ -17,7 +17,13 @@
package org.apache.kafka.streams.scala
package kstream
import org.apache.kafka.streams.kstream.{CogroupedKStream => CogroupedKStreamJ, SessionWindows, Window, Windows}
import org.apache.kafka.streams.kstream.{
SessionWindows,
SlidingWindows,
Window,
Windows,
CogroupedKStream => CogroupedKStreamJ
}
import org.apache.kafka.streams.scala.FunctionsCompatConversions.{AggregatorFromFunction, InitializerFromFunction}
/**
@ -82,6 +88,16 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) {
def windowedBy[W <: Window](windows: Windows[W]): TimeWindowedCogroupedKStream[KIn, VOut] =
new TimeWindowedCogroupedKStream(inner.windowedBy(windows))
/**
* Create a new [[TimeWindowedCogroupedKStream]] instance that can be used to perform sliding windowed aggregations.
*
* @param windows the specification of the aggregation `SlidingWindows`
* @return an instance of [[TimeWindowedCogroupedKStream]]
* @see `org.apache.kafka.streams.kstream.CogroupedKStream#windowedBy`
*/
def windowedBy(windows: SlidingWindows): TimeWindowedCogroupedKStream[KIn, VOut] =
new TimeWindowedCogroupedKStream(inner.windowedBy(windows))
/**
* Create a new [[SessionWindowedKStream]] instance that can be used to perform session windowed aggregations.
*

View File

@ -21,6 +21,7 @@ import org.apache.kafka.streams.kstream.internals.KTableImpl
import org.apache.kafka.streams.scala.serialization.Serdes
import org.apache.kafka.streams.kstream.{
SessionWindows,
SlidingWindows,
Window,
Windows,
KGroupedStream => KGroupedStreamJ,
@ -155,6 +156,16 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
def windowedBy[W <: Window](windows: Windows[W]): TimeWindowedKStream[K, V] =
new TimeWindowedKStream(inner.windowedBy(windows))
/**
* Create a new [[TimeWindowedKStream]] instance that can be used to perform sliding windowed aggregations.
*
* @param windows the specification of the aggregation `SlidingWindows`
* @return an instance of [[TimeWindowedKStream]]
* @see `org.apache.kafka.streams.kstream.KGroupedStream#windowedBy`
*/
def windowedBy(windows: SlidingWindows): TimeWindowedKStream[K, V] =
new TimeWindowedKStream(inner.windowedBy(windows))
/**
* Create a new [[SessionWindowedKStream]] instance that can be used to perform session windowed aggregations.
*

View File

@ -17,15 +17,23 @@
package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig
import org.apache.kafka.streams.kstream.{Named, SessionWindows, TimeWindows, Windowed, Suppressed => JSuppressed}
import org.apache.kafka.streams.kstream.{
Named,
SlidingWindows,
SessionWindows,
TimeWindows,
Windowed,
Suppressed => JSuppressed
}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.scala.utils.TestDriver
import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, StreamsBuilder}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue}
import org.junit.jupiter.api.Test
import java.time.Duration
import java.time.Duration.ofMillis
import scala.jdk.CollectionConverters._
class KTableTest extends TestDriver {
@ -211,6 +219,44 @@ class KTableTest extends TestDriver {
testDriver.close()
}
@Test
def testCorrectlyGroupByKeyWindowedBySlidingWindow(): Unit = {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
val window = SlidingWindows.withTimeDifferenceAndGrace(ofMillis(1000L), ofMillis(1000L))
val suppression = JSuppressed.untilWindowCloses(BufferConfig.unbounded())
val table: KTable[Windowed[String], Long] = builder
.stream[String, String](sourceTopic)
.groupByKey
.windowedBy(window)
.count()
.suppress(suppression)
table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic)
val testDriver = createTestDriver(builder)
val testInput = testDriver.createInput[String, String](sourceTopic)
val testOutput = testDriver.createOutput[String, Long](sinkTopic)
{
// publish key=1 @ time 0 => count==1
testInput.pipeInput("1", "value1", 0L)
assertTrue(testOutput.isEmpty)
}
{
// move event time right past the grace period of the first window.
testInput.pipeInput("2", "value3", 5001L)
val record = testOutput.readKeyValue
assertEquals("0:1000:1", record.key)
assertEquals(1L, record.value)
}
assertTrue(testOutput.isEmpty)
testDriver.close()
}
@Test
def testCorrectlySuppressResultsUsingSuppressedUntilWindowClosesByWindowed(): Unit = {
val builder = new StreamsBuilder()