mirror of https://github.com/apache/kafka.git
KAFKA-7778: Add KTable.suppress to Scala API (#6314)
Detailed description * Adds KTable.suppress to the Scala API. * Fixed count in KGroupedStream, SessionWindowedKStream, and TimeWindowedKStream so that the value serde gets passed down to the KTable returned by the internal mapValues method. * Suppress API support for Java 1.8 + Scala 2.11 Testing strategy I added unit tests covering: * Windowed KTable.count.suppress w/ Suppressed.untilTimeLimit * Windowed KTable.count.suppress w/ Suppressed.untilWindowCloses * Non-windowed KTable.count.suppress w/ Suppressed.untilTimeLimit * Session-windowed KTable.count.suppress w/ Suppressed.untilWindowCloses Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
parent
7bd81628d9
commit
b7d7f7590d
|
@ -33,11 +33,20 @@ public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Marker interface for a buffer configuration that will strictly enforce size constraints
|
||||
* (bytes and/or number of records) on the buffer, so it is suitable for reducing duplicate
|
||||
* results downstream, but does not promise to eliminate them entirely.
|
||||
*/
|
||||
interface EagerBufferConfig extends BufferConfig<EagerBufferConfig> {
|
||||
|
||||
}
|
||||
|
||||
interface BufferConfig<BC extends BufferConfig<BC>> {
|
||||
/**
|
||||
* Create a size-constrained buffer in terms of the maximum number of keys it will store.
|
||||
*/
|
||||
static BufferConfig<?> maxRecords(final long recordLimit) {
|
||||
static EagerBufferConfig maxRecords(final long recordLimit) {
|
||||
return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
|
@ -49,7 +58,7 @@ public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
|
|||
/**
|
||||
* Create a size-constrained buffer in terms of the maximum number of bytes it will use.
|
||||
*/
|
||||
static BufferConfig<?> maxBytes(final long byteLimit) {
|
||||
static EagerBufferConfig maxBytes(final long byteLimit) {
|
||||
return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit);
|
||||
}
|
||||
|
||||
|
@ -108,7 +117,7 @@ public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
|
|||
* This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing
|
||||
* duplicate results downstream, but does not promise to eliminate them.
|
||||
*/
|
||||
BufferConfig emitEarlyWhenFull();
|
||||
EagerBufferConfig emitEarlyWhenFull();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -43,7 +43,7 @@ abstract class BufferConfigInternal<BC extends Suppressed.BufferConfig<BC>> impl
|
|||
}
|
||||
|
||||
@Override
|
||||
public Suppressed.BufferConfig emitEarlyWhenFull() {
|
||||
public Suppressed.EagerBufferConfig emitEarlyWhenFull() {
|
||||
return new EagerBufferConfigImpl(maxRecords(), maxBytes());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ import org.apache.kafka.streams.kstream.Suppressed;
|
|||
|
||||
import java.util.Objects;
|
||||
|
||||
public class EagerBufferConfigImpl extends BufferConfigInternal {
|
||||
public class EagerBufferConfigImpl extends BufferConfigInternal<Suppressed.EagerBufferConfig> implements Suppressed.EagerBufferConfig {
|
||||
|
||||
private final long maxRecords;
|
||||
private final long maxBytes;
|
||||
|
@ -31,12 +31,12 @@ public class EagerBufferConfigImpl extends BufferConfigInternal {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Suppressed.BufferConfig withMaxRecords(final long recordLimit) {
|
||||
public Suppressed.EagerBufferConfig withMaxRecords(final long recordLimit) {
|
||||
return new EagerBufferConfigImpl(recordLimit, maxBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Suppressed.BufferConfig withMaxBytes(final long byteLimit) {
|
||||
public Suppressed.EagerBufferConfig withMaxBytes(final long byteLimit) {
|
||||
return new EagerBufferConfigImpl(maxRecords, byteLimit);
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,8 @@
|
|||
package org.apache.kafka.streams.scala
|
||||
package kstream
|
||||
|
||||
import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, _}
|
||||
import org.apache.kafka.streams.kstream.internals.KTableImpl
|
||||
import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, KTable => KTableJ, _}
|
||||
import org.apache.kafka.streams.scala.ImplicitConversions._
|
||||
import org.apache.kafka.streams.scala.FunctionsCompatConversions._
|
||||
|
||||
|
@ -46,9 +47,13 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
|
|||
* @see `org.apache.kafka.streams.kstream.KGroupedStream#count`
|
||||
*/
|
||||
def count()(implicit materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = {
|
||||
val c: KTable[K, java.lang.Long] =
|
||||
val javaCountTable: KTableJ[K, java.lang.Long] =
|
||||
inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]])
|
||||
c.mapValues[Long](Long2long _)
|
||||
val tableImpl = javaCountTable.asInstanceOf[KTableImpl[K, ByteArrayKeyValueStore, java.lang.Long]]
|
||||
javaCountTable.mapValues[Long](
|
||||
((l: java.lang.Long) => Long2long(l)).asValueMapper,
|
||||
Materialized.`with`[K, Long, ByteArrayKeyValueStore](tableImpl.keySerde(), Serdes.Long)
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -159,6 +159,18 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
def toStream[KR](mapper: (K, V) => KR): KStream[KR, V] =
|
||||
inner.toStream[KR](mapper.asKeyValueMapper)
|
||||
|
||||
/**
|
||||
* Suppress some updates from this changelog stream, determined by the supplied [[Suppressed]] configuration.
|
||||
*
|
||||
* This controls what updates downstream table and stream operations will receive.
|
||||
*
|
||||
* @param suppressed Configuration object determining what, if any, updates to suppress.
|
||||
* @return A new KTable with the desired suppression characteristics.
|
||||
* @see `org.apache.kafka.streams.kstream.KTable#suppress`
|
||||
*/
|
||||
def suppress(suppressed: Suppressed[_ >: K]): KTable[K, V] =
|
||||
inner.suppress(suppressed)
|
||||
|
||||
/**
|
||||
* Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type).
|
||||
* Transform the value of each input record into a new value (with possible new type) of the output record.
|
||||
|
|
|
@ -20,7 +20,8 @@
|
|||
package org.apache.kafka.streams.scala
|
||||
package kstream
|
||||
|
||||
import org.apache.kafka.streams.kstream.{SessionWindowedKStream => SessionWindowedKStreamJ, _}
|
||||
import org.apache.kafka.streams.kstream.internals.KTableImpl
|
||||
import org.apache.kafka.streams.kstream.{KTable => KTableJ, SessionWindowedKStream => SessionWindowedKStreamJ, _}
|
||||
import org.apache.kafka.streams.scala.ImplicitConversions._
|
||||
import org.apache.kafka.streams.scala.FunctionsCompatConversions._
|
||||
|
||||
|
@ -60,9 +61,13 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
|
|||
* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count`
|
||||
*/
|
||||
def count()(implicit materialized: Materialized[K, Long, ByteArraySessionStore]): KTable[Windowed[K], Long] = {
|
||||
val c: KTable[Windowed[K], java.lang.Long] =
|
||||
val javaCountTable: KTableJ[Windowed[K], java.lang.Long] =
|
||||
inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArraySessionStore]])
|
||||
c.mapValues[Long](Long2long _)
|
||||
val tableImpl = javaCountTable.asInstanceOf[KTableImpl[Windowed[K], ByteArraySessionStore, java.lang.Long]]
|
||||
javaCountTable.mapValues[Long](
|
||||
((l: java.lang.Long) => Long2long(l)).asValueMapper,
|
||||
Materialized.`with`[Windowed[K], Long, ByteArrayKeyValueStore](tableImpl.keySerde(), Serdes.Long)
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,128 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.scala.kstream
|
||||
import java.time.Duration
|
||||
|
||||
import org.apache.kafka.streams.kstream.{Windowed, Suppressed => SupressedJ}
|
||||
import org.apache.kafka.streams.kstream.Suppressed.{
|
||||
EagerBufferConfig,
|
||||
StrictBufferConfig,
|
||||
BufferConfig => BufferConfigJ
|
||||
}
|
||||
import org.apache.kafka.streams.kstream.internals.suppress.{
|
||||
EagerBufferConfigImpl,
|
||||
FinalResultsSuppressionBuilder,
|
||||
StrictBufferConfigImpl,
|
||||
SuppressedInternal
|
||||
}
|
||||
|
||||
/**
|
||||
* Duplicates the static factory methods inside the Java interface [[org.apache.kafka.streams.kstream.Suppressed]].
|
||||
*
|
||||
* This is required for compatibility w/ Scala 2.11 + Java 1.8 because the Scala 2.11 compiler doesn't support the use
|
||||
* of static methods inside Java interfaces.
|
||||
*
|
||||
* TODO: Deprecate this class if support for Scala 2.11 + Java 1.8 is dropped.
|
||||
*/
|
||||
object Suppressed {
|
||||
|
||||
/**
|
||||
* Configure the suppression to emit only the "final results" from the window.
|
||||
*
|
||||
* By default all Streams operators emit results whenever new results are available.
|
||||
* This includes windowed operations.
|
||||
*
|
||||
* This configuration will instead emit just one result per key for each window, guaranteeing
|
||||
* to deliver only the final result. This option is suitable for use cases in which the business logic
|
||||
* requires a hard guarantee that only the final result is propagated. For example, sending alerts.
|
||||
*
|
||||
* To accomplish this, the operator will buffer events from the window until the window close (that is,
|
||||
* until the end-time passes, and additionally until the grace period expires). Since windowed operators
|
||||
* are required to reject late events for a window whose grace period is expired, there is an additional
|
||||
* guarantee that the final results emitted from this suppression will match any queriable state upstream.
|
||||
*
|
||||
* @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
|
||||
* This is required to be a "strict" config, since it would violate the "final results"
|
||||
* property to emit early and then issue an update later.
|
||||
* @tparam K The [[Windowed]] key type for the KTable to apply this suppression to.
|
||||
* @return a "final results" mode suppression configuration
|
||||
* @see [[org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit]]
|
||||
*/
|
||||
def untilWindowCloses[K](bufferConfig: StrictBufferConfig): SupressedJ[Windowed[K]] =
|
||||
new FinalResultsSuppressionBuilder[Windowed[K]](null, bufferConfig)
|
||||
|
||||
/**
|
||||
* Configure the suppression to wait `timeToWaitForMoreEvents` amount of time after receiving a record
|
||||
* before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces
|
||||
* the first record in the buffer but does <em>not</em> re-start the timer.
|
||||
*
|
||||
* @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events.
|
||||
* @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
|
||||
* @tparam K The key type for the KTable to apply this suppression to.
|
||||
* @return a suppression configuration
|
||||
* @see [[org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit]]
|
||||
*/
|
||||
def untilTimeLimit[K](timeToWaitForMoreEvents: Duration, bufferConfig: BufferConfigJ[_]): SupressedJ[K] =
|
||||
new SuppressedInternal[K](null, timeToWaitForMoreEvents, bufferConfig, null, false)
|
||||
|
||||
/**
|
||||
* Duplicates the static factory methods inside the Java interface
|
||||
* [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig]].
|
||||
*/
|
||||
object BufferConfig {
|
||||
|
||||
/**
|
||||
* Create a size-constrained buffer in terms of the maximum number of keys it will store.
|
||||
*
|
||||
* @param recordLimit maximum number of keys to buffer.
|
||||
* @return size-constrained buffer in terms of the maximum number of keys it will store.
|
||||
* @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords]]
|
||||
*/
|
||||
def maxRecords(recordLimit: Long): EagerBufferConfig =
|
||||
new EagerBufferConfigImpl(recordLimit, Long.MaxValue)
|
||||
|
||||
/**
|
||||
* Create a size-constrained buffer in terms of the maximum number of bytes it will use.
|
||||
*
|
||||
* @param byteLimit maximum number of bytes to buffer.
|
||||
* @return size-constrained buffer in terms of the maximum number of bytes it will use.
|
||||
* @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes]]
|
||||
*/
|
||||
def maxBytes(byteLimit: Long): EagerBufferConfig =
|
||||
new EagerBufferConfigImpl(Long.MaxValue, byteLimit)
|
||||
|
||||
/**
|
||||
* Create a buffer unconstrained by size (either keys or bytes).
|
||||
*
|
||||
* As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
|
||||
*
|
||||
* If there isn't enough heap available to meet the demand, the application will encounter an
|
||||
* [[OutOfMemoryError]] and shut down (not guaranteed to be a graceful exit). Also, note that
|
||||
* JVM processes under extreme memory pressure may exhibit poor GC behavior.
|
||||
*
|
||||
* This is a convenient option if you doubt that your buffer will be that large, but also don't
|
||||
* wish to pick particular constraints, such as in testing.
|
||||
*
|
||||
* This buffer is "strict" in the sense that it will enforce the time bound or crash.
|
||||
* It will never emit early.
|
||||
*
|
||||
* @return a buffer unconstrained by size (either keys or bytes).
|
||||
* @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded]]
|
||||
*/
|
||||
def unbounded(): StrictBufferConfig = new StrictBufferConfigImpl()
|
||||
}
|
||||
}
|
|
@ -20,7 +20,8 @@
|
|||
package org.apache.kafka.streams.scala
|
||||
package kstream
|
||||
|
||||
import org.apache.kafka.streams.kstream.{TimeWindowedKStream => TimeWindowedKStreamJ, _}
|
||||
import org.apache.kafka.streams.kstream.internals.KTableImpl
|
||||
import org.apache.kafka.streams.kstream.{KTable => KTableJ, TimeWindowedKStream => TimeWindowedKStreamJ, _}
|
||||
import org.apache.kafka.streams.scala.ImplicitConversions._
|
||||
import org.apache.kafka.streams.scala.FunctionsCompatConversions._
|
||||
|
||||
|
@ -59,9 +60,13 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
|
|||
* @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count`
|
||||
*/
|
||||
def count()(implicit materialized: Materialized[K, Long, ByteArrayWindowStore]): KTable[Windowed[K], Long] = {
|
||||
val c: KTable[Windowed[K], java.lang.Long] =
|
||||
val javaCountTable: KTableJ[Windowed[K], java.lang.Long] =
|
||||
inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayWindowStore]])
|
||||
c.mapValues[Long](Long2long _)
|
||||
val tableImpl = javaCountTable.asInstanceOf[KTableImpl[Windowed[K], ByteArrayWindowStore, java.lang.Long]]
|
||||
javaCountTable.mapValues[Long](
|
||||
((l: java.lang.Long) => Long2long(l)).asValueMapper,
|
||||
Materialized.`with`[Windowed[K], Long, ByteArrayKeyValueStore](tableImpl.keySerde(), Serdes.Long)
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,8 +18,12 @@
|
|||
*/
|
||||
package org.apache.kafka.streams.scala.kstream
|
||||
|
||||
import java.time.Duration
|
||||
|
||||
import org.apache.kafka.streams.kstream.{SessionWindows, TimeWindows, Windowed}
|
||||
import org.apache.kafka.streams.scala.ImplicitConversions._
|
||||
import org.apache.kafka.streams.scala.Serdes._
|
||||
import org.apache.kafka.streams.scala.kstream.Suppressed.BufferConfig
|
||||
import org.apache.kafka.streams.scala.utils.TestDriver
|
||||
import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, StreamsBuilder}
|
||||
import org.junit.runner.RunWith
|
||||
|
@ -139,4 +143,224 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
|
|||
|
||||
testDriver.close()
|
||||
}
|
||||
|
||||
"windowed KTable#suppress" should "correctly suppress results using Suppressed.untilTimeLimit" in {
|
||||
val builder = new StreamsBuilder()
|
||||
val sourceTopic = "source"
|
||||
val sinkTopic = "sink"
|
||||
val window = TimeWindows.of(Duration.ofSeconds(1L))
|
||||
val suppression = Suppressed.untilTimeLimit[Windowed[String]](Duration.ofSeconds(2L), BufferConfig.unbounded())
|
||||
|
||||
val table: KTable[Windowed[String], Long] = builder
|
||||
.stream[String, String](sourceTopic)
|
||||
.groupByKey
|
||||
.windowedBy(window)
|
||||
.count
|
||||
.suppress(suppression)
|
||||
|
||||
table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic)
|
||||
|
||||
val testDriver = createTestDriver(builder)
|
||||
|
||||
{
|
||||
// publish key=1 @ time 0 => count==1
|
||||
testDriver.pipeRecord(sourceTopic, ("1", "value1"), 0L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// publish key=1 @ time 1 => count==2
|
||||
testDriver.pipeRecord(sourceTopic, ("1", "value2"), 1L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// move event time past the first window, but before the suppression window
|
||||
testDriver.pipeRecord(sourceTopic, ("2", "value1"), 1001L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// move event time riiiight before suppression window ends
|
||||
testDriver.pipeRecord(sourceTopic, ("2", "value2"), 1999L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// publish a late event before suppression window terminates => count==3
|
||||
testDriver.pipeRecord(sourceTopic, ("1", "value3"), 999L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// move event time right past the suppression window of the first window.
|
||||
testDriver.pipeRecord(sourceTopic, ("2", "value3"), 2001L)
|
||||
val record = testDriver.readRecord[String, Long](sinkTopic)
|
||||
record.key shouldBe "0:1000:1"
|
||||
record.value shouldBe 3L
|
||||
}
|
||||
testDriver.readRecord[String, Long](sinkTopic) shouldBe null
|
||||
|
||||
testDriver.close()
|
||||
}
|
||||
|
||||
"windowed KTable#suppress" should "correctly suppress results using Suppressed.untilWindowCloses" in {
|
||||
val builder = new StreamsBuilder()
|
||||
val sourceTopic = "source"
|
||||
val sinkTopic = "sink"
|
||||
val window = TimeWindows.of(Duration.ofSeconds(1L)).grace(Duration.ofSeconds(1L))
|
||||
val suppression = Suppressed.untilWindowCloses[String](BufferConfig.unbounded())
|
||||
|
||||
val table: KTable[Windowed[String], Long] = builder
|
||||
.stream[String, String](sourceTopic)
|
||||
.groupByKey
|
||||
.windowedBy(window)
|
||||
.count
|
||||
.suppress(suppression)
|
||||
|
||||
table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic)
|
||||
|
||||
val testDriver = createTestDriver(builder)
|
||||
|
||||
{
|
||||
// publish key=1 @ time 0 => count==1
|
||||
testDriver.pipeRecord(sourceTopic, ("1", "value1"), 0L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// publish key=1 @ time 1 => count==2
|
||||
testDriver.pipeRecord(sourceTopic, ("1", "value2"), 1L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// move event time past the window, but before the grace period
|
||||
testDriver.pipeRecord(sourceTopic, ("2", "value1"), 1001L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// move event time riiiight before grace period ends
|
||||
testDriver.pipeRecord(sourceTopic, ("2", "value2"), 1999L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// publish a late event before grace period terminates => count==3
|
||||
testDriver.pipeRecord(sourceTopic, ("1", "value3"), 999L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// move event time right past the grace period of the first window.
|
||||
testDriver.pipeRecord(sourceTopic, ("2", "value3"), 2001L)
|
||||
val record = testDriver.readRecord[String, Long](sinkTopic)
|
||||
record.key shouldBe "0:1000:1"
|
||||
record.value shouldBe 3L
|
||||
}
|
||||
testDriver.readRecord[String, Long](sinkTopic) shouldBe null
|
||||
|
||||
testDriver.close()
|
||||
}
|
||||
|
||||
"session windowed KTable#suppress" should "correctly suppress results using Suppressed.untilWindowCloses" in {
|
||||
val builder = new StreamsBuilder()
|
||||
val sourceTopic = "source"
|
||||
val sinkTopic = "sink"
|
||||
// Very similar to SuppressScenarioTest.shouldSupportFinalResultsForSessionWindows
|
||||
val window = SessionWindows.`with`(Duration.ofMillis(5L)).grace(Duration.ofMillis(10L))
|
||||
val suppression = Suppressed.untilWindowCloses[String](BufferConfig.unbounded())
|
||||
|
||||
val table: KTable[Windowed[String], Long] = builder
|
||||
.stream[String, String](sourceTopic)
|
||||
.groupByKey
|
||||
.windowedBy(window)
|
||||
.count
|
||||
.suppress(suppression)
|
||||
|
||||
table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic)
|
||||
|
||||
val testDriver = createTestDriver(builder)
|
||||
|
||||
{
|
||||
// first window
|
||||
testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 0L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// first window
|
||||
testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 1L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// new window, but grace period hasn't ended for first window
|
||||
testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 8L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// late event for first window, included since grade period hasn't passed
|
||||
testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 2L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// push stream time forward to flush other events through
|
||||
testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 30L)
|
||||
// too-late event should get dropped from the stream
|
||||
testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 3L)
|
||||
// should now have to results
|
||||
val r1 = testDriver.readRecord[String, Long](sinkTopic)
|
||||
r1.key shouldBe "0:2:k1"
|
||||
r1.value shouldBe 3L
|
||||
val r2 = testDriver.readRecord[String, Long](sinkTopic)
|
||||
r2.key shouldBe "8:8:k1"
|
||||
r2.value shouldBe 1
|
||||
}
|
||||
testDriver.readRecord[String, Long](sinkTopic) shouldBe null
|
||||
|
||||
testDriver.close()
|
||||
}
|
||||
|
||||
"non-windowed KTable#suppress" should "correctly suppress results using Suppressed.untilTimeLimit" in {
|
||||
val builder = new StreamsBuilder()
|
||||
val sourceTopic = "source"
|
||||
val sinkTopic = "sink"
|
||||
val suppression = Suppressed.untilTimeLimit[String](Duration.ofSeconds(2L), BufferConfig.unbounded())
|
||||
|
||||
val table: KTable[String, Long] = builder
|
||||
.stream[String, String](sourceTopic)
|
||||
.groupByKey
|
||||
.count
|
||||
.suppress(suppression)
|
||||
|
||||
table.toStream.to(sinkTopic)
|
||||
|
||||
val testDriver = createTestDriver(builder)
|
||||
|
||||
{
|
||||
// publish key=1 @ time 0 => count==1
|
||||
testDriver.pipeRecord(sourceTopic, ("1", "value1"), 0L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// publish key=1 @ time 1 => count==2
|
||||
testDriver.pipeRecord(sourceTopic, ("1", "value2"), 1L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// move event time past the window, but before the grace period
|
||||
testDriver.pipeRecord(sourceTopic, ("2", "value1"), 1001L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// move event time riiiight before grace period ends
|
||||
testDriver.pipeRecord(sourceTopic, ("2", "value2"), 1999L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// publish a late event before grace period terminates => count==3
|
||||
testDriver.pipeRecord(sourceTopic, ("1", "value3"), 999L)
|
||||
Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
|
||||
}
|
||||
{
|
||||
// move event time right past the grace period of the first window.
|
||||
testDriver.pipeRecord(sourceTopic, ("2", "value3"), 2001L)
|
||||
val record = testDriver.readRecord[String, Long](sinkTopic)
|
||||
record.key shouldBe "1"
|
||||
record.value shouldBe 3L
|
||||
}
|
||||
testDriver.readRecord[String, Long](sinkTopic) shouldBe null
|
||||
|
||||
testDriver.close()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* Copyright (C) 2018 Joan Goyeau.
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.scala.kstream
|
||||
import java.time.Duration
|
||||
|
||||
import org.apache.kafka.streams.kstream.internals.suppress.{
|
||||
BufferFullStrategy,
|
||||
EagerBufferConfigImpl,
|
||||
FinalResultsSuppressionBuilder,
|
||||
StrictBufferConfigImpl,
|
||||
SuppressedInternal
|
||||
}
|
||||
import org.apache.kafka.streams.scala.kstream.Suppressed.BufferConfig
|
||||
import org.junit.runner.RunWith
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.scalatest.{FlatSpec, Matchers}
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class SuppressedTest extends FlatSpec with Matchers {
|
||||
|
||||
"Suppressed.untilWindowCloses" should "produce the correct suppression" in {
|
||||
val bufferConfig = BufferConfig.unbounded()
|
||||
val suppression = Suppressed.untilWindowCloses[String](bufferConfig)
|
||||
suppression shouldEqual new FinalResultsSuppressionBuilder(null, bufferConfig)
|
||||
suppression.withName("soup") shouldEqual new FinalResultsSuppressionBuilder("soup", bufferConfig)
|
||||
}
|
||||
|
||||
"Suppressed.untilTimeLimit" should "produce the correct suppression" in {
|
||||
val bufferConfig = BufferConfig.unbounded()
|
||||
val duration = Duration.ofMillis(1)
|
||||
Suppressed.untilTimeLimit[String](duration, bufferConfig) shouldEqual
|
||||
new SuppressedInternal[String](null, duration, bufferConfig, null, false)
|
||||
}
|
||||
|
||||
"BufferConfig.maxRecords" should "produce the correct buffer config" in {
|
||||
BufferConfig.maxRecords(4) shouldEqual new EagerBufferConfigImpl(4, Long.MaxValue)
|
||||
BufferConfig.maxRecords(4).withMaxBytes(5) shouldEqual new EagerBufferConfigImpl(4, 5)
|
||||
}
|
||||
|
||||
"BufferConfig.maxBytes" should "produce the correct buffer config" in {
|
||||
BufferConfig.maxBytes(4) shouldEqual new EagerBufferConfigImpl(Long.MaxValue, 4)
|
||||
BufferConfig.maxBytes(4).withMaxRecords(5) shouldEqual new EagerBufferConfigImpl(5, 4)
|
||||
}
|
||||
|
||||
"BufferConfig.unbounded" should "produce the correct buffer config" in {
|
||||
BufferConfig.unbounded() shouldEqual
|
||||
new StrictBufferConfigImpl(Long.MaxValue, Long.MaxValue, BufferFullStrategy.SHUT_DOWN)
|
||||
}
|
||||
|
||||
"BufferConfig" should "support very long chains of factory methods" in {
|
||||
val bc1 = BufferConfig
|
||||
.unbounded()
|
||||
.emitEarlyWhenFull()
|
||||
.withMaxRecords(3L)
|
||||
.withMaxBytes(4L)
|
||||
.withMaxRecords(5L)
|
||||
.withMaxBytes(6L)
|
||||
bc1 shouldEqual new EagerBufferConfigImpl(5L, 6L)
|
||||
bc1.shutDownWhenFull() shouldEqual new StrictBufferConfigImpl(5L, 6L, BufferFullStrategy.SHUT_DOWN)
|
||||
|
||||
val bc2 = BufferConfig
|
||||
.maxBytes(4)
|
||||
.withMaxRecords(5)
|
||||
.withMaxBytes(6)
|
||||
.withNoBound()
|
||||
.withMaxBytes(7)
|
||||
.withMaxRecords(8)
|
||||
|
||||
bc2 shouldEqual new StrictBufferConfigImpl(8L, 7L, BufferFullStrategy.SHUT_DOWN)
|
||||
bc2.withNoBound() shouldEqual BufferConfig.unbounded()
|
||||
|
||||
val bc3 = BufferConfig
|
||||
.maxRecords(5L)
|
||||
.withMaxBytes(10L)
|
||||
.emitEarlyWhenFull()
|
||||
.withMaxRecords(11L)
|
||||
|
||||
bc3 shouldEqual new EagerBufferConfigImpl(11L, 10L)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue