diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java index b5d79371747..c80eaecc70f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java @@ -33,11 +33,20 @@ public interface Suppressed extends NamedOperation> { } + /** + * Marker interface for a buffer configuration that will strictly enforce size constraints + * (bytes and/or number of records) on the buffer, so it is suitable for reducing duplicate + * results downstream, but does not promise to eliminate them entirely. + */ + interface EagerBufferConfig extends BufferConfig { + + } + interface BufferConfig> { /** * Create a size-constrained buffer in terms of the maximum number of keys it will store. */ - static BufferConfig maxRecords(final long recordLimit) { + static EagerBufferConfig maxRecords(final long recordLimit) { return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE); } @@ -49,7 +58,7 @@ public interface Suppressed extends NamedOperation> { /** * Create a size-constrained buffer in terms of the maximum number of bytes it will use. */ - static BufferConfig maxBytes(final long byteLimit) { + static EagerBufferConfig maxBytes(final long byteLimit) { return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit); } @@ -108,7 +117,7 @@ public interface Suppressed extends NamedOperation> { * This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing * duplicate results downstream, but does not promise to eliminate them. */ - BufferConfig emitEarlyWhenFull(); + EagerBufferConfig emitEarlyWhenFull(); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java index 10675ef2bcb..2087945ab88 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java @@ -43,7 +43,7 @@ abstract class BufferConfigInternal> impl } @Override - public Suppressed.BufferConfig emitEarlyWhenFull() { + public Suppressed.EagerBufferConfig emitEarlyWhenFull() { return new EagerBufferConfigImpl(maxRecords(), maxBytes()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java index e94abc1c844..1c1b30c3edc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java @@ -20,7 +20,7 @@ import org.apache.kafka.streams.kstream.Suppressed; import java.util.Objects; -public class EagerBufferConfigImpl extends BufferConfigInternal { +public class EagerBufferConfigImpl extends BufferConfigInternal implements Suppressed.EagerBufferConfig { private final long maxRecords; private final long maxBytes; @@ -31,12 +31,12 @@ public class EagerBufferConfigImpl extends BufferConfigInternal { } @Override - public Suppressed.BufferConfig withMaxRecords(final long recordLimit) { + public Suppressed.EagerBufferConfig withMaxRecords(final long recordLimit) { return new EagerBufferConfigImpl(recordLimit, maxBytes); } @Override - public Suppressed.BufferConfig withMaxBytes(final long byteLimit) { + public Suppressed.EagerBufferConfig withMaxBytes(final long byteLimit) { return new EagerBufferConfigImpl(maxRecords, byteLimit); } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala index 5168805d3eb..98fb12b691b 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala @@ -20,7 +20,8 @@ package org.apache.kafka.streams.scala package kstream -import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, _} +import org.apache.kafka.streams.kstream.internals.KTableImpl +import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, KTable => KTableJ, _} import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.FunctionsCompatConversions._ @@ -46,9 +47,13 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KGroupedStream#count` */ def count()(implicit materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] = { - val c: KTable[K, java.lang.Long] = + val javaCountTable: KTableJ[K, java.lang.Long] = inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]]) - c.mapValues[Long](Long2long _) + val tableImpl = javaCountTable.asInstanceOf[KTableImpl[K, ByteArrayKeyValueStore, java.lang.Long]] + javaCountTable.mapValues[Long]( + ((l: java.lang.Long) => Long2long(l)).asValueMapper, + Materialized.`with`[K, Long, ByteArrayKeyValueStore](tableImpl.keySerde(), Serdes.Long) + ) } /** diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala index 9ac27ee40ef..20ee08b7872 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala @@ -159,6 +159,18 @@ class KTable[K, V](val inner: KTableJ[K, V]) { def toStream[KR](mapper: (K, V) => KR): KStream[KR, V] = inner.toStream[KR](mapper.asKeyValueMapper) + /** + * Suppress some updates from this changelog stream, determined by the supplied [[Suppressed]] configuration. + * + * This controls what updates downstream table and stream operations will receive. + * + * @param suppressed Configuration object determining what, if any, updates to suppress. + * @return A new KTable with the desired suppression characteristics. + * @see `org.apache.kafka.streams.kstream.KTable#suppress` + */ + def suppress(suppressed: Suppressed[_ >: K]): KTable[K, V] = + inner.suppress(suppressed) + /** * Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type). * Transform the value of each input record into a new value (with possible new type) of the output record. diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala index 6571df9bbeb..a3e8ae28ee4 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala @@ -20,7 +20,8 @@ package org.apache.kafka.streams.scala package kstream -import org.apache.kafka.streams.kstream.{SessionWindowedKStream => SessionWindowedKStreamJ, _} +import org.apache.kafka.streams.kstream.internals.KTableImpl +import org.apache.kafka.streams.kstream.{KTable => KTableJ, SessionWindowedKStream => SessionWindowedKStreamJ, _} import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.FunctionsCompatConversions._ @@ -60,9 +61,13 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) { * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count` */ def count()(implicit materialized: Materialized[K, Long, ByteArraySessionStore]): KTable[Windowed[K], Long] = { - val c: KTable[Windowed[K], java.lang.Long] = + val javaCountTable: KTableJ[Windowed[K], java.lang.Long] = inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArraySessionStore]]) - c.mapValues[Long](Long2long _) + val tableImpl = javaCountTable.asInstanceOf[KTableImpl[Windowed[K], ByteArraySessionStore, java.lang.Long]] + javaCountTable.mapValues[Long]( + ((l: java.lang.Long) => Long2long(l)).asValueMapper, + Materialized.`with`[Windowed[K], Long, ByteArrayKeyValueStore](tableImpl.keySerde(), Serdes.Long) + ) } /** diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala new file mode 100644 index 00000000000..2fdc09df478 --- /dev/null +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.scala.kstream +import java.time.Duration + +import org.apache.kafka.streams.kstream.{Windowed, Suppressed => SupressedJ} +import org.apache.kafka.streams.kstream.Suppressed.{ + EagerBufferConfig, + StrictBufferConfig, + BufferConfig => BufferConfigJ +} +import org.apache.kafka.streams.kstream.internals.suppress.{ + EagerBufferConfigImpl, + FinalResultsSuppressionBuilder, + StrictBufferConfigImpl, + SuppressedInternal +} + +/** + * Duplicates the static factory methods inside the Java interface [[org.apache.kafka.streams.kstream.Suppressed]]. + * + * This is required for compatibility w/ Scala 2.11 + Java 1.8 because the Scala 2.11 compiler doesn't support the use + * of static methods inside Java interfaces. + * + * TODO: Deprecate this class if support for Scala 2.11 + Java 1.8 is dropped. + */ +object Suppressed { + + /** + * Configure the suppression to emit only the "final results" from the window. + * + * By default all Streams operators emit results whenever new results are available. + * This includes windowed operations. + * + * This configuration will instead emit just one result per key for each window, guaranteeing + * to deliver only the final result. This option is suitable for use cases in which the business logic + * requires a hard guarantee that only the final result is propagated. For example, sending alerts. + * + * To accomplish this, the operator will buffer events from the window until the window close (that is, + * until the end-time passes, and additionally until the grace period expires). Since windowed operators + * are required to reject late events for a window whose grace period is expired, there is an additional + * guarantee that the final results emitted from this suppression will match any queriable state upstream. + * + * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results. + * This is required to be a "strict" config, since it would violate the "final results" + * property to emit early and then issue an update later. + * @tparam K The [[Windowed]] key type for the KTable to apply this suppression to. + * @return a "final results" mode suppression configuration + * @see [[org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit]] + */ + def untilWindowCloses[K](bufferConfig: StrictBufferConfig): SupressedJ[Windowed[K]] = + new FinalResultsSuppressionBuilder[Windowed[K]](null, bufferConfig) + + /** + * Configure the suppression to wait `timeToWaitForMoreEvents` amount of time after receiving a record + * before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces + * the first record in the buffer but does not re-start the timer. + * + * @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events. + * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results. + * @tparam K The key type for the KTable to apply this suppression to. + * @return a suppression configuration + * @see [[org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit]] + */ + def untilTimeLimit[K](timeToWaitForMoreEvents: Duration, bufferConfig: BufferConfigJ[_]): SupressedJ[K] = + new SuppressedInternal[K](null, timeToWaitForMoreEvents, bufferConfig, null, false) + + /** + * Duplicates the static factory methods inside the Java interface + * [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig]]. + */ + object BufferConfig { + + /** + * Create a size-constrained buffer in terms of the maximum number of keys it will store. + * + * @param recordLimit maximum number of keys to buffer. + * @return size-constrained buffer in terms of the maximum number of keys it will store. + * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords]] + */ + def maxRecords(recordLimit: Long): EagerBufferConfig = + new EagerBufferConfigImpl(recordLimit, Long.MaxValue) + + /** + * Create a size-constrained buffer in terms of the maximum number of bytes it will use. + * + * @param byteLimit maximum number of bytes to buffer. + * @return size-constrained buffer in terms of the maximum number of bytes it will use. + * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes]] + */ + def maxBytes(byteLimit: Long): EagerBufferConfig = + new EagerBufferConfigImpl(Long.MaxValue, byteLimit) + + /** + * Create a buffer unconstrained by size (either keys or bytes). + * + * As a result, the buffer will consume as much memory as it needs, dictated by the time bound. + * + * If there isn't enough heap available to meet the demand, the application will encounter an + * [[OutOfMemoryError]] and shut down (not guaranteed to be a graceful exit). Also, note that + * JVM processes under extreme memory pressure may exhibit poor GC behavior. + * + * This is a convenient option if you doubt that your buffer will be that large, but also don't + * wish to pick particular constraints, such as in testing. + * + * This buffer is "strict" in the sense that it will enforce the time bound or crash. + * It will never emit early. + * + * @return a buffer unconstrained by size (either keys or bytes). + * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded]] + */ + def unbounded(): StrictBufferConfig = new StrictBufferConfigImpl() + } +} diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala index d84416e3908..c160600c2fd 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala @@ -20,7 +20,8 @@ package org.apache.kafka.streams.scala package kstream -import org.apache.kafka.streams.kstream.{TimeWindowedKStream => TimeWindowedKStreamJ, _} +import org.apache.kafka.streams.kstream.internals.KTableImpl +import org.apache.kafka.streams.kstream.{KTable => KTableJ, TimeWindowedKStream => TimeWindowedKStreamJ, _} import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.FunctionsCompatConversions._ @@ -59,9 +60,13 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) { * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count` */ def count()(implicit materialized: Materialized[K, Long, ByteArrayWindowStore]): KTable[Windowed[K], Long] = { - val c: KTable[Windowed[K], java.lang.Long] = + val javaCountTable: KTableJ[Windowed[K], java.lang.Long] = inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayWindowStore]]) - c.mapValues[Long](Long2long _) + val tableImpl = javaCountTable.asInstanceOf[KTableImpl[Windowed[K], ByteArrayWindowStore, java.lang.Long]] + javaCountTable.mapValues[Long]( + ((l: java.lang.Long) => Long2long(l)).asValueMapper, + Materialized.`with`[Windowed[K], Long, ByteArrayKeyValueStore](tableImpl.keySerde(), Serdes.Long) + ) } /** diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala index dc080f13310..f2de3deb73c 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala @@ -18,8 +18,12 @@ */ package org.apache.kafka.streams.scala.kstream +import java.time.Duration + +import org.apache.kafka.streams.kstream.{SessionWindows, TimeWindows, Windowed} import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.Serdes._ +import org.apache.kafka.streams.scala.kstream.Suppressed.BufferConfig import org.apache.kafka.streams.scala.utils.TestDriver import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, StreamsBuilder} import org.junit.runner.RunWith @@ -139,4 +143,224 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { testDriver.close() } + + "windowed KTable#suppress" should "correctly suppress results using Suppressed.untilTimeLimit" in { + val builder = new StreamsBuilder() + val sourceTopic = "source" + val sinkTopic = "sink" + val window = TimeWindows.of(Duration.ofSeconds(1L)) + val suppression = Suppressed.untilTimeLimit[Windowed[String]](Duration.ofSeconds(2L), BufferConfig.unbounded()) + + val table: KTable[Windowed[String], Long] = builder + .stream[String, String](sourceTopic) + .groupByKey + .windowedBy(window) + .count + .suppress(suppression) + + table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic) + + val testDriver = createTestDriver(builder) + + { + // publish key=1 @ time 0 => count==1 + testDriver.pipeRecord(sourceTopic, ("1", "value1"), 0L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // publish key=1 @ time 1 => count==2 + testDriver.pipeRecord(sourceTopic, ("1", "value2"), 1L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // move event time past the first window, but before the suppression window + testDriver.pipeRecord(sourceTopic, ("2", "value1"), 1001L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // move event time riiiight before suppression window ends + testDriver.pipeRecord(sourceTopic, ("2", "value2"), 1999L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // publish a late event before suppression window terminates => count==3 + testDriver.pipeRecord(sourceTopic, ("1", "value3"), 999L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // move event time right past the suppression window of the first window. + testDriver.pipeRecord(sourceTopic, ("2", "value3"), 2001L) + val record = testDriver.readRecord[String, Long](sinkTopic) + record.key shouldBe "0:1000:1" + record.value shouldBe 3L + } + testDriver.readRecord[String, Long](sinkTopic) shouldBe null + + testDriver.close() + } + + "windowed KTable#suppress" should "correctly suppress results using Suppressed.untilWindowCloses" in { + val builder = new StreamsBuilder() + val sourceTopic = "source" + val sinkTopic = "sink" + val window = TimeWindows.of(Duration.ofSeconds(1L)).grace(Duration.ofSeconds(1L)) + val suppression = Suppressed.untilWindowCloses[String](BufferConfig.unbounded()) + + val table: KTable[Windowed[String], Long] = builder + .stream[String, String](sourceTopic) + .groupByKey + .windowedBy(window) + .count + .suppress(suppression) + + table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic) + + val testDriver = createTestDriver(builder) + + { + // publish key=1 @ time 0 => count==1 + testDriver.pipeRecord(sourceTopic, ("1", "value1"), 0L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // publish key=1 @ time 1 => count==2 + testDriver.pipeRecord(sourceTopic, ("1", "value2"), 1L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // move event time past the window, but before the grace period + testDriver.pipeRecord(sourceTopic, ("2", "value1"), 1001L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // move event time riiiight before grace period ends + testDriver.pipeRecord(sourceTopic, ("2", "value2"), 1999L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // publish a late event before grace period terminates => count==3 + testDriver.pipeRecord(sourceTopic, ("1", "value3"), 999L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // move event time right past the grace period of the first window. + testDriver.pipeRecord(sourceTopic, ("2", "value3"), 2001L) + val record = testDriver.readRecord[String, Long](sinkTopic) + record.key shouldBe "0:1000:1" + record.value shouldBe 3L + } + testDriver.readRecord[String, Long](sinkTopic) shouldBe null + + testDriver.close() + } + + "session windowed KTable#suppress" should "correctly suppress results using Suppressed.untilWindowCloses" in { + val builder = new StreamsBuilder() + val sourceTopic = "source" + val sinkTopic = "sink" + // Very similar to SuppressScenarioTest.shouldSupportFinalResultsForSessionWindows + val window = SessionWindows.`with`(Duration.ofMillis(5L)).grace(Duration.ofMillis(10L)) + val suppression = Suppressed.untilWindowCloses[String](BufferConfig.unbounded()) + + val table: KTable[Windowed[String], Long] = builder + .stream[String, String](sourceTopic) + .groupByKey + .windowedBy(window) + .count + .suppress(suppression) + + table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic) + + val testDriver = createTestDriver(builder) + + { + // first window + testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 0L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // first window + testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 1L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // new window, but grace period hasn't ended for first window + testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 8L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // late event for first window, included since grade period hasn't passed + testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 2L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // push stream time forward to flush other events through + testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 30L) + // too-late event should get dropped from the stream + testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 3L) + // should now have to results + val r1 = testDriver.readRecord[String, Long](sinkTopic) + r1.key shouldBe "0:2:k1" + r1.value shouldBe 3L + val r2 = testDriver.readRecord[String, Long](sinkTopic) + r2.key shouldBe "8:8:k1" + r2.value shouldBe 1 + } + testDriver.readRecord[String, Long](sinkTopic) shouldBe null + + testDriver.close() + } + + "non-windowed KTable#suppress" should "correctly suppress results using Suppressed.untilTimeLimit" in { + val builder = new StreamsBuilder() + val sourceTopic = "source" + val sinkTopic = "sink" + val suppression = Suppressed.untilTimeLimit[String](Duration.ofSeconds(2L), BufferConfig.unbounded()) + + val table: KTable[String, Long] = builder + .stream[String, String](sourceTopic) + .groupByKey + .count + .suppress(suppression) + + table.toStream.to(sinkTopic) + + val testDriver = createTestDriver(builder) + + { + // publish key=1 @ time 0 => count==1 + testDriver.pipeRecord(sourceTopic, ("1", "value1"), 0L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // publish key=1 @ time 1 => count==2 + testDriver.pipeRecord(sourceTopic, ("1", "value2"), 1L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // move event time past the window, but before the grace period + testDriver.pipeRecord(sourceTopic, ("2", "value1"), 1001L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // move event time riiiight before grace period ends + testDriver.pipeRecord(sourceTopic, ("2", "value2"), 1999L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // publish a late event before grace period terminates => count==3 + testDriver.pipeRecord(sourceTopic, ("1", "value3"), 999L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // move event time right past the grace period of the first window. + testDriver.pipeRecord(sourceTopic, ("2", "value3"), 2001L) + val record = testDriver.readRecord[String, Long](sinkTopic) + record.key shouldBe "1" + record.value shouldBe 3L + } + testDriver.readRecord[String, Long](sinkTopic) shouldBe null + + testDriver.close() + } } diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala new file mode 100644 index 00000000000..5df83471caa --- /dev/null +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2018 Joan Goyeau. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.scala.kstream +import java.time.Duration + +import org.apache.kafka.streams.kstream.internals.suppress.{ + BufferFullStrategy, + EagerBufferConfigImpl, + FinalResultsSuppressionBuilder, + StrictBufferConfigImpl, + SuppressedInternal +} +import org.apache.kafka.streams.scala.kstream.Suppressed.BufferConfig +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{FlatSpec, Matchers} + +@RunWith(classOf[JUnitRunner]) +class SuppressedTest extends FlatSpec with Matchers { + + "Suppressed.untilWindowCloses" should "produce the correct suppression" in { + val bufferConfig = BufferConfig.unbounded() + val suppression = Suppressed.untilWindowCloses[String](bufferConfig) + suppression shouldEqual new FinalResultsSuppressionBuilder(null, bufferConfig) + suppression.withName("soup") shouldEqual new FinalResultsSuppressionBuilder("soup", bufferConfig) + } + + "Suppressed.untilTimeLimit" should "produce the correct suppression" in { + val bufferConfig = BufferConfig.unbounded() + val duration = Duration.ofMillis(1) + Suppressed.untilTimeLimit[String](duration, bufferConfig) shouldEqual + new SuppressedInternal[String](null, duration, bufferConfig, null, false) + } + + "BufferConfig.maxRecords" should "produce the correct buffer config" in { + BufferConfig.maxRecords(4) shouldEqual new EagerBufferConfigImpl(4, Long.MaxValue) + BufferConfig.maxRecords(4).withMaxBytes(5) shouldEqual new EagerBufferConfigImpl(4, 5) + } + + "BufferConfig.maxBytes" should "produce the correct buffer config" in { + BufferConfig.maxBytes(4) shouldEqual new EagerBufferConfigImpl(Long.MaxValue, 4) + BufferConfig.maxBytes(4).withMaxRecords(5) shouldEqual new EagerBufferConfigImpl(5, 4) + } + + "BufferConfig.unbounded" should "produce the correct buffer config" in { + BufferConfig.unbounded() shouldEqual + new StrictBufferConfigImpl(Long.MaxValue, Long.MaxValue, BufferFullStrategy.SHUT_DOWN) + } + + "BufferConfig" should "support very long chains of factory methods" in { + val bc1 = BufferConfig + .unbounded() + .emitEarlyWhenFull() + .withMaxRecords(3L) + .withMaxBytes(4L) + .withMaxRecords(5L) + .withMaxBytes(6L) + bc1 shouldEqual new EagerBufferConfigImpl(5L, 6L) + bc1.shutDownWhenFull() shouldEqual new StrictBufferConfigImpl(5L, 6L, BufferFullStrategy.SHUT_DOWN) + + val bc2 = BufferConfig + .maxBytes(4) + .withMaxRecords(5) + .withMaxBytes(6) + .withNoBound() + .withMaxBytes(7) + .withMaxRecords(8) + + bc2 shouldEqual new StrictBufferConfigImpl(8L, 7L, BufferFullStrategy.SHUT_DOWN) + bc2.withNoBound() shouldEqual BufferConfig.unbounded() + + val bc3 = BufferConfig + .maxRecords(5L) + .withMaxBytes(10L) + .emitEarlyWhenFull() + .withMaxRecords(11L) + + bc3 shouldEqual new EagerBufferConfigImpl(11L, 10L) + } +}