mirror of https://github.com/apache/kafka.git
MINOR: Add Scalafmt to Streams Scala API (#4965)
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
parent
9a18f92935
commit
05c5854d1f
|
@ -29,6 +29,15 @@ buildscript {
|
|||
classpath 'org.scoverage:gradle-scoverage:2.3.0'
|
||||
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
|
||||
classpath 'org.owasp:dependency-check-gradle:3.2.1'
|
||||
classpath "com.diffplug.spotless:spotless-plugin-gradle:3.10.0"
|
||||
}
|
||||
}
|
||||
|
||||
apply plugin: "com.diffplug.gradle.spotless"
|
||||
spotless {
|
||||
scala {
|
||||
target 'streams/**/*.scala'
|
||||
scalafmt('1.5.1').configFile('checkstyle/.scalafmt.conf')
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
// contributor license agreements. See the NOTICE file distributed with
|
||||
// this work for additional information regarding copyright ownership.
|
||||
// The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
// (the "License"); you may not use this file except in compliance with
|
||||
// the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
docstrings = JavaDoc
|
||||
maxColumn = 120
|
||||
continuationIndent.defnSite = 2
|
||||
assumeStandardLibraryStripMargin = true
|
||||
danglingParentheses = true
|
||||
rewrite.rules = [SortImports, RedundantBraces, RedundantParens, SortModifiers]
|
|
@ -30,7 +30,7 @@ import java.lang.{Iterable => JIterable}
|
|||
* more expressive, with less boilerplate and more succinct.
|
||||
* <p>
|
||||
* For Scala 2.11, most of these conversions need to be invoked explicitly, as Scala 2.11 does not
|
||||
* have full support for SAM types.
|
||||
* have full support for SAM types.
|
||||
*/
|
||||
object FunctionConversions {
|
||||
|
||||
|
@ -40,7 +40,7 @@ object FunctionConversions {
|
|||
}
|
||||
}
|
||||
|
||||
implicit class MapperFromFunction[T, U, VR](val f:(T,U) => VR) extends AnyVal {
|
||||
implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends AnyVal {
|
||||
def asKeyValueMapper: KeyValueMapper[T, U, VR] = new KeyValueMapper[T, U, VR] {
|
||||
override def apply(key: T, value: U): VR = f(key, value)
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ object FunctionConversions {
|
|||
}
|
||||
}
|
||||
|
||||
implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f:(K,V) => (KR, VR)) extends AnyVal {
|
||||
implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f: (K, V) => (KR, VR)) extends AnyVal {
|
||||
def asKeyValueMapper: KeyValueMapper[K, V, KeyValue[KR, VR]] = new KeyValueMapper[K, V, KeyValue[KR, VR]] {
|
||||
override def apply(key: K, value: V): KeyValue[KR, VR] = {
|
||||
val (kr, vr) = f(key, value)
|
||||
|
@ -88,7 +88,7 @@ object FunctionConversions {
|
|||
}
|
||||
}
|
||||
|
||||
implicit class MergerFromFunction[K,VR](val f: (K, VR, VR) => VR) extends AnyVal {
|
||||
implicit class MergerFromFunction[K, VR](val f: (K, VR, VR) => VR) extends AnyVal {
|
||||
def asMerger: Merger[K, VR] = new Merger[K, VR] {
|
||||
override def apply(aggKey: K, aggOne: VR, aggTwo: VR): VR = f(aggKey, aggOne, aggTwo)
|
||||
}
|
||||
|
|
|
@ -77,7 +77,8 @@ object ImplicitConversions {
|
|||
valueSerde: Serde[V]): Materialized[K, V, S] =
|
||||
Materialized.`with`[K, V, S](keySerde, valueSerde)
|
||||
|
||||
implicit def joinedFromKeyValueOtherSerde[K, V, VO]
|
||||
(implicit keySerde: Serde[K], valueSerde: Serde[V], otherValueSerde: Serde[VO]): Joined[K, V, VO] =
|
||||
implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K],
|
||||
valueSerde: Serde[V],
|
||||
otherValueSerde: Serde[VO]): Joined[K, V, VO] =
|
||||
Joined.`with`(keySerde, valueSerde, otherValueSerde)
|
||||
}
|
||||
|
|
|
@ -25,47 +25,48 @@ import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer, S
|
|||
import org.apache.kafka.streams.kstream.WindowedSerdes
|
||||
|
||||
object Serdes {
|
||||
implicit val String: Serde[String] = JSerdes.String()
|
||||
implicit val Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
|
||||
implicit val JavaLong: Serde[java.lang.Long] = JSerdes.Long()
|
||||
implicit val ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
|
||||
implicit val String: Serde[String] = JSerdes.String()
|
||||
implicit val Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
|
||||
implicit val JavaLong: Serde[java.lang.Long] = JSerdes.Long()
|
||||
implicit val ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
|
||||
implicit val Bytes: Serde[org.apache.kafka.common.utils.Bytes] = JSerdes.Bytes()
|
||||
implicit val Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
|
||||
implicit val JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
|
||||
implicit val Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]]
|
||||
implicit val JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
|
||||
implicit val Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
|
||||
implicit val JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
|
||||
implicit val Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
|
||||
implicit val JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
|
||||
implicit val Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]]
|
||||
implicit val JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
|
||||
implicit val Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
|
||||
implicit val JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
|
||||
|
||||
implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new WindowedSerdes.TimeWindowedSerde[T]()
|
||||
implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] = new WindowedSerdes.SessionWindowedSerde[T]()
|
||||
implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] =
|
||||
new WindowedSerdes.SessionWindowedSerde[T]()
|
||||
|
||||
def fromFn[T >: Null](serializer: T => Array[Byte], deserializer: Array[Byte] => Option[T]): Serde[T] =
|
||||
JSerdes.serdeFrom(
|
||||
new Serializer[T] {
|
||||
override def serialize(topic: String, data: T): Array[Byte] = serializer(data)
|
||||
override def serialize(topic: String, data: T): Array[Byte] = serializer(data)
|
||||
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
|
||||
override def close(): Unit = ()
|
||||
override def close(): Unit = ()
|
||||
},
|
||||
new Deserializer[T] {
|
||||
override def deserialize(topic: String, data: Array[Byte]): T = deserializer(data).orNull
|
||||
override def deserialize(topic: String, data: Array[Byte]): T = deserializer(data).orNull
|
||||
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
|
||||
override def close(): Unit = ()
|
||||
override def close(): Unit = ()
|
||||
}
|
||||
)
|
||||
|
||||
def fromFn[T >: Null](serializer: (String, T) => Array[Byte],
|
||||
deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
|
||||
deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
|
||||
JSerdes.serdeFrom(
|
||||
new Serializer[T] {
|
||||
override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data)
|
||||
override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data)
|
||||
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
|
||||
override def close(): Unit = ()
|
||||
override def close(): Unit = ()
|
||||
},
|
||||
new Deserializer[T] {
|
||||
override def deserialize(topic: String, data: Array[Byte]): T = deserializer(topic, data).orNull
|
||||
override def deserialize(topic: String, data: Array[Byte]): T = deserializer(topic, data).orNull
|
||||
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
|
||||
override def close(): Unit = ()
|
||||
override def close(): Unit = ()
|
||||
}
|
||||
)
|
||||
}
|
||||
|
|
|
@ -31,18 +31,18 @@ import ImplicitConversions._
|
|||
import scala.collection.JavaConverters._
|
||||
|
||||
/**
|
||||
* Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
|
||||
*/
|
||||
* Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
|
||||
*/
|
||||
class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
|
||||
|
||||
/**
|
||||
* Create a [[kstream.KStream]] from the specified topic.
|
||||
* <p>
|
||||
* The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
|
||||
* The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
|
||||
* key and value deserializers etc. If the implicit is not found in scope, compiler error will result.
|
||||
* <p>
|
||||
* A convenient alternative is to have the necessary implicit serdes in scope, which will be implicitly
|
||||
* converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
|
||||
* converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
|
||||
* {{{
|
||||
* // Brings all implicit conversions in scope
|
||||
* import ImplicitConversions._
|
||||
|
@ -88,11 +88,11 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
|
|||
/**
|
||||
* Create a [[kstream.KTable]] from the specified topic.
|
||||
* <p>
|
||||
* The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
|
||||
* The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
|
||||
* key and value deserializers etc. If the implicit is not found in scope, compiler error will result.
|
||||
* <p>
|
||||
* A convenient alternative is to have the necessary implicit serdes in scope, which will be implicitly
|
||||
* converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
|
||||
* converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
|
||||
* {{{
|
||||
* // Brings all implicit conversions in scope
|
||||
* import ImplicitConversions._
|
||||
|
@ -123,8 +123,9 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
|
|||
* @see #table(String)
|
||||
* @see `org.apache.kafka.streams.StreamsBuilder#table`
|
||||
*/
|
||||
def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])
|
||||
(implicit consumed: Consumed[K, V]): KTable[K, V] =
|
||||
def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
|
||||
implicit consumed: Consumed[K, V]
|
||||
): KTable[K, V] =
|
||||
inner.table[K, V](topic, consumed, materialized)
|
||||
|
||||
/**
|
||||
|
@ -139,8 +140,8 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
|
|||
inner.globalTable(topic, consumed)
|
||||
|
||||
/**
|
||||
* Create a `GlobalKTable` from the specified topic. The resulting `GlobalKTable` will be materialized
|
||||
* in a local `KeyValueStore` configured with the provided instance of `Materialized`. The serializers
|
||||
* Create a `GlobalKTable` from the specified topic. The resulting `GlobalKTable` will be materialized
|
||||
* in a local `KeyValueStore` configured with the provided instance of `Materialized`. The serializers
|
||||
* from the implicit `Consumed` instance will be used.
|
||||
*
|
||||
* @param topic the topic name
|
||||
|
@ -148,12 +149,13 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
|
|||
* @return a `GlobalKTable` for the specified topic
|
||||
* @see `org.apache.kafka.streams.StreamsBuilder#globalTable`
|
||||
*/
|
||||
def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])
|
||||
(implicit consumed: Consumed[K, V]): GlobalKTable[K, V] =
|
||||
def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
|
||||
implicit consumed: Consumed[K, V]
|
||||
): GlobalKTable[K, V] =
|
||||
inner.globalTable(topic, consumed, materialized)
|
||||
|
||||
/**
|
||||
* Adds a state store to the underlying `Topology`. The store must still be "connected" to a `Processor`,
|
||||
* Adds a state store to the underlying `Topology`. The store must still be "connected" to a `Processor`,
|
||||
* `Transformer`, or `ValueTransformer` before it can be used.
|
||||
*
|
||||
* @param builder the builder used to obtain this state store `StateStore` instance
|
||||
|
@ -164,11 +166,11 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
|
|||
def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilderJ = inner.addStateStore(builder)
|
||||
|
||||
/**
|
||||
* Adds a global `StateStore` to the topology. Global stores should not be added to `Processor, `Transformer`,
|
||||
* Adds a global `StateStore` to the topology. Global stores should not be added to `Processor, `Transformer`,
|
||||
* or `ValueTransformer` (in contrast to regular stores).
|
||||
*
|
||||
* @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore`
|
||||
*/
|
||||
*/
|
||||
def addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore],
|
||||
topic: String,
|
||||
consumed: Consumed[_, _],
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, _}
|
|||
import org.apache.kafka.streams.scala.ImplicitConversions._
|
||||
import org.apache.kafka.streams.scala.FunctionConversions._
|
||||
|
||||
|
||||
/**
|
||||
* Wraps the Java class KGroupedStream and delegates method calls to the underlying Java object.
|
||||
*
|
||||
|
@ -41,7 +40,7 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
|
|||
* The result is written into a local `KeyValueStore` (which is basically an ever-updating materialized view)
|
||||
* provided by the given `materialized`.
|
||||
*
|
||||
* @param materialized an instance of `Materialized` used to materialize a state store.
|
||||
* @param materialized an instance of `Materialized` used to materialize a state store.
|
||||
* @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
|
||||
* represent the latest (rolling) count (i.e., number of records) for each key
|
||||
* @see `org.apache.kafka.streams.kstream.KGroupedStream#count`
|
||||
|
@ -55,8 +54,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
|
|||
/**
|
||||
* Combine the values of records in this stream by the grouped key.
|
||||
*
|
||||
* @param reducer a function `(V, V) => V` that computes a new aggregate result.
|
||||
* @param materialized an instance of `Materialized` used to materialize a state store.
|
||||
* @param reducer a function `(V, V) => V` that computes a new aggregate result.
|
||||
* @param materialized an instance of `Materialized` used to materialize a state store.
|
||||
* @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
|
||||
* latest (rolling) aggregate for each key
|
||||
* @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
|
||||
|
|
|
@ -39,7 +39,7 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
|
|||
* Count number of records of the original [[KTable]] that got [[KTable#groupBy]] to
|
||||
* the same key into a new instance of [[KTable]].
|
||||
*
|
||||
* @param materialized an instance of `Materialized` used to materialize a state store.
|
||||
* @param materialized an instance of `Materialized` used to materialize a state store.
|
||||
* @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
|
||||
* represent the latest (rolling) count (i.e., number of records) for each key
|
||||
* @see `org.apache.kafka.streams.kstream.KGroupedTable#count`
|
||||
|
|
|
@ -46,7 +46,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
* @param predicate a filter that is applied to each record
|
||||
* @return a [[KStream]] that contains only those records that satisfy the given predicate
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#filter`
|
||||
*/
|
||||
*/
|
||||
def filter(predicate: (K, V) => Boolean): KStream[K, V] =
|
||||
inner.filter(predicate.asPredicate)
|
||||
|
||||
|
@ -57,7 +57,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
* @param predicate a filter that is applied to each record
|
||||
* @return a [[KStream]] that contains only those records that do <em>not</em> satisfy the given predicate
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#filterNot`
|
||||
*/
|
||||
*/
|
||||
def filterNot(predicate: (K, V) => Boolean): KStream[K, V] =
|
||||
inner.filterNot(predicate.asPredicate)
|
||||
|
||||
|
@ -70,7 +70,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
* @param mapper a function `(K, V) => KR` that computes a new key for each record
|
||||
* @return a [[KStream]] that contains records with new key (possibly of different type) and unmodified value
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#selectKey`
|
||||
*/
|
||||
*/
|
||||
def selectKey[KR](mapper: (K, V) => KR): KStream[KR, V] =
|
||||
inner.selectKey[KR](mapper.asKeyValueMapper)
|
||||
|
||||
|
@ -83,7 +83,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
* @param mapper a function `(K, V) => (KR, VR)` that computes a new output record
|
||||
* @return a [[KStream]] that contains records with new key and value (possibly both of different type)
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#map`
|
||||
*/
|
||||
*/
|
||||
def map[KR, VR](mapper: (K, V) => (KR, VR)): KStream[KR, VR] = {
|
||||
val kvMapper = mapper.tupled andThen tuple2ToKeyValue
|
||||
inner.map[KR, VR](((k: K, v: V) => kvMapper(k, v)).asKeyValueMapper)
|
||||
|
@ -97,7 +97,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
* @param mapper, a function `V => VR` that computes a new output value
|
||||
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#mapValues`
|
||||
*/
|
||||
*/
|
||||
def mapValues[VR](mapper: V => VR): KStream[K, VR] =
|
||||
inner.mapValues[VR](mapper.asValueMapper)
|
||||
|
||||
|
@ -109,7 +109,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
* @param mapper, a function `(K, V) => VR` that computes a new output value
|
||||
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#mapValues`
|
||||
*/
|
||||
*/
|
||||
def mapValues[VR](mapper: (K, V) => VR): KStream[K, VR] =
|
||||
inner.mapValues[VR](mapper.asValueMapperWithKey)
|
||||
|
||||
|
@ -122,10 +122,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
* @param mapper function `(K, V) => Iterable[(KR, VR)]` that computes the new output records
|
||||
* @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#flatMap`
|
||||
*/
|
||||
*/
|
||||
def flatMap[KR, VR](mapper: (K, V) => Iterable[(KR, VR)]): KStream[KR, VR] = {
|
||||
val kvMapper = mapper.tupled andThen (iter => iter.map(tuple2ToKeyValue).asJava)
|
||||
inner.flatMap[KR, VR](((k: K, v: V) => kvMapper(k , v)).asKeyValueMapper)
|
||||
inner.flatMap[KR, VR](((k: K, v: V) => kvMapper(k, v)).asKeyValueMapper)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -139,7 +139,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
* @param mapper a function `V => Iterable[VR]` that computes the new output values
|
||||
* @return a [[KStream]] that contains more or less records with unmodified keys and new values of different type
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#flatMapValues`
|
||||
*/
|
||||
*/
|
||||
def flatMapValues[VR](mapper: V => Iterable[VR]): KStream[K, VR] =
|
||||
inner.flatMapValues[VR](mapper.asValueMapper)
|
||||
|
||||
|
@ -154,7 +154,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
* @param mapper a function `(K, V) => Iterable[VR]` that computes the new output values
|
||||
* @return a [[KStream]] that contains more or less records with unmodified keys and new values of different type
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#flatMapValues`
|
||||
*/
|
||||
*/
|
||||
def flatMapValues[VR](mapper: (K, V) => Iterable[VR]): KStream[K, VR] =
|
||||
inner.flatMapValues[VR](mapper.asValueMapperWithKey)
|
||||
|
||||
|
@ -187,7 +187,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
inner.branch(predicates.map(_.asPredicate): _*).map(kstream => wrapKStream(kstream))
|
||||
|
||||
/**
|
||||
* Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Produced` instance for
|
||||
* Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Produced` instance for
|
||||
* configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner`
|
||||
* <p>
|
||||
* The user can either supply the `Produced` instance as an implicit in scope or she can also provide implicit
|
||||
|
@ -219,7 +219,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
inner.through(topic, produced)
|
||||
|
||||
/**
|
||||
* Materialize this stream to a topic using the `Produced` instance for
|
||||
* Materialize this stream to a topic using the `Produced` instance for
|
||||
* configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner`
|
||||
* <p>
|
||||
* The user can either supply the `Produced` instance as an implicit in scope or she can also provide implicit
|
||||
|
@ -250,34 +250,34 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
inner.to(topic, produced)
|
||||
|
||||
/**
|
||||
* Dynamically materialize this stream to topics using the `Produced` instance for
|
||||
* configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner`.
|
||||
* The topic names for each record to send to is dynamically determined based on the given mapper.
|
||||
* <p>
|
||||
* The user can either supply the `Produced` instance as an implicit in scope or she can also provide implicit
|
||||
* key and value serdes that will be converted to a `Produced` instance implicitly.
|
||||
* <p>
|
||||
* {{{
|
||||
* Example:
|
||||
*
|
||||
* // brings implicit serdes in scope
|
||||
* import Serdes._
|
||||
*
|
||||
* //..
|
||||
* val clicksPerRegion: KTable[String, Long] = //..
|
||||
*
|
||||
* // Implicit serdes in scope will generate an implicit Produced instance, which
|
||||
* // will be passed automatically to the call of through below
|
||||
* clicksPerRegion.to(topicChooser)
|
||||
*
|
||||
* // Similarly you can create an implicit Produced and it will be passed implicitly
|
||||
* // to the through call
|
||||
* }}}
|
||||
*
|
||||
* @param extractor the extractor to determine the name of the Kafka topic to write to for reach record
|
||||
* @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner`
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#to`
|
||||
*/
|
||||
* Dynamically materialize this stream to topics using the `Produced` instance for
|
||||
* configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner`.
|
||||
* The topic names for each record to send to is dynamically determined based on the given mapper.
|
||||
* <p>
|
||||
* The user can either supply the `Produced` instance as an implicit in scope or she can also provide implicit
|
||||
* key and value serdes that will be converted to a `Produced` instance implicitly.
|
||||
* <p>
|
||||
* {{{
|
||||
* Example:
|
||||
*
|
||||
* // brings implicit serdes in scope
|
||||
* import Serdes._
|
||||
*
|
||||
* //..
|
||||
* val clicksPerRegion: KTable[String, Long] = //..
|
||||
*
|
||||
* // Implicit serdes in scope will generate an implicit Produced instance, which
|
||||
* // will be passed automatically to the call of through below
|
||||
* clicksPerRegion.to(topicChooser)
|
||||
*
|
||||
* // Similarly you can create an implicit Produced and it will be passed implicitly
|
||||
* // to the through call
|
||||
* }}}
|
||||
*
|
||||
* @param extractor the extractor to determine the name of the Kafka topic to write to for reach record
|
||||
* @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner`
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#to`
|
||||
*/
|
||||
def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, V]): Unit =
|
||||
inner.to(extractor, produced)
|
||||
|
||||
|
@ -292,25 +292,23 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
* @param stateStoreNames the names of the state stores used by the processor
|
||||
* @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#transform`
|
||||
*/
|
||||
def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)],
|
||||
stateStoreNames: String*): KStream[K1, V1] = {
|
||||
val transformerSupplierJ: TransformerSupplier[K, V, KeyValue[K1, V1]] = new TransformerSupplier[K, V, KeyValue[K1, V1]] {
|
||||
override def get(): Transformer[K, V, KeyValue[K1, V1]] = {
|
||||
new Transformer[K, V, KeyValue[K1, V1]] {
|
||||
override def transform(key: K, value: V): KeyValue[K1, V1] = {
|
||||
transformer.transform(key, value) match {
|
||||
case (k1, v1) => KeyValue.pair(k1, v1)
|
||||
case _ => null
|
||||
}
|
||||
*/
|
||||
def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)], stateStoreNames: String*): KStream[K1, V1] = {
|
||||
val transformerSupplierJ: TransformerSupplier[K, V, KeyValue[K1, V1]] =
|
||||
new TransformerSupplier[K, V, KeyValue[K1, V1]] {
|
||||
override def get(): Transformer[K, V, KeyValue[K1, V1]] =
|
||||
new Transformer[K, V, KeyValue[K1, V1]] {
|
||||
override def transform(key: K, value: V): KeyValue[K1, V1] =
|
||||
transformer.transform(key, value) match {
|
||||
case (k1, v1) => KeyValue.pair(k1, v1)
|
||||
case _ => null
|
||||
}
|
||||
|
||||
override def init(context: ProcessorContext): Unit = transformer.init(context)
|
||||
|
||||
override def close(): Unit = transformer.close()
|
||||
}
|
||||
|
||||
override def init(context: ProcessorContext): Unit = transformer.init(context)
|
||||
|
||||
override def close(): Unit = transformer.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
inner.transform(transformerSupplierJ, stateStoreNames: _*)
|
||||
}
|
||||
|
||||
|
@ -318,14 +316,14 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
* Transform the value of each input record into a new value (with possible new type) of the output record.
|
||||
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
|
||||
* record value and computes a new value for it.
|
||||
* In order to assign a state, the state must be created and registered
|
||||
* beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
|
||||
* In order to assign a state, the state must be created and registered
|
||||
* beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
|
||||
*
|
||||
* @param valueTransformerSupplier a instance of `ValueTransformerSupplier` that generates a `ValueTransformer`
|
||||
* @param stateStoreNames the names of the state stores used by the processor
|
||||
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
|
||||
*/
|
||||
*/
|
||||
def transformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, VR],
|
||||
stateStoreNames: String*): KStream[K, VR] =
|
||||
inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*)
|
||||
|
@ -334,29 +332,28 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
* Transform the value of each input record into a new value (with possible new type) of the output record.
|
||||
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
|
||||
* record value and computes a new value for it.
|
||||
* In order to assign a state, the state must be created and registered
|
||||
* beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
|
||||
* In order to assign a state, the state must be created and registered
|
||||
* beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
|
||||
*
|
||||
* @param valueTransformerSupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
|
||||
* @param stateStoreNames the names of the state stores used by the processor
|
||||
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
|
||||
*/
|
||||
*/
|
||||
def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
|
||||
stateStoreNames: String*): KStream[K, VR] = {
|
||||
stateStoreNames: String*): KStream[K, VR] =
|
||||
inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*)
|
||||
}
|
||||
|
||||
/**
|
||||
* Process all records in this stream, one record at a time, by applying a `Processor` (provided by the given
|
||||
* `processorSupplier`).
|
||||
* In order to assign a state, the state must be created and registered
|
||||
* beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
|
||||
* In order to assign a state, the state must be created and registered
|
||||
* beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
|
||||
*
|
||||
* @param processorSupplier a function that generates a [[org.apache.kafka.stream.Processor]]
|
||||
* @param stateStoreNames the names of the state store used by the processor
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#process`
|
||||
*/
|
||||
*/
|
||||
def process(processorSupplier: () => Processor[K, V], stateStoreNames: String*): Unit = {
|
||||
val processorSupplierJ: ProcessorSupplier[K, V] = new ProcessorSupplier[K, V] {
|
||||
override def get(): Processor[K, V] = processorSupplier()
|
||||
|
@ -365,7 +362,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
}
|
||||
|
||||
/**
|
||||
* Group the records by their current key into a [[KGroupedStream]]
|
||||
* Group the records by their current key into a [[KGroupedStream]]
|
||||
* <p>
|
||||
* The user can either supply the `Serialized` instance as an implicit in scope or she can also provide an implicit
|
||||
* serdes that will be converted to a `Serialized` instance implicitly.
|
||||
|
@ -390,10 +387,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
* // to the groupByKey call
|
||||
* }}}
|
||||
*
|
||||
* @param (implicit) serialized the instance of Serialized that gives the serdes
|
||||
* @param (implicit) serialized the instance of Serialized that gives the serdes
|
||||
* @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]]
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#groupByKey`
|
||||
*/
|
||||
*/
|
||||
def groupByKey(implicit serialized: Serialized[K, V]): KGroupedStream[K, V] =
|
||||
inner.groupByKey(serialized)
|
||||
|
||||
|
@ -427,18 +424,18 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
* @param selector a function that computes a new key for grouping
|
||||
* @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]]
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#groupBy`
|
||||
*/
|
||||
*/
|
||||
def groupBy[KR](selector: (K, V) => KR)(implicit serialized: Serialized[KR, V]): KGroupedStream[KR, V] =
|
||||
inner.groupBy(selector.asKeyValueMapper, serialized)
|
||||
|
||||
/**
|
||||
* Join records of this stream with another [[KStream]]'s records using windowed inner equi join with
|
||||
* Join records of this stream with another [[KStream]]'s records using windowed inner equi join with
|
||||
* serializers and deserializers supplied by the implicit `Joined` instance.
|
||||
*
|
||||
* @param otherStream the [[KStream]] to be joined with this stream
|
||||
* @param joiner a function that computes the join result for a pair of matching records
|
||||
* @param windows the specification of the `JoinWindows`
|
||||
* @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize
|
||||
* @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize
|
||||
* inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply
|
||||
* key serde, value serde and other value serde in implicit scope and they will be
|
||||
* converted to the instance of `Joined` through implicit conversion
|
||||
|
@ -453,17 +450,17 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
|
||||
|
||||
/**
|
||||
* Join records of this stream with another [[KTable]]'s records using inner equi join with
|
||||
* Join records of this stream with another [[KTable]]'s records using inner equi join with
|
||||
* serializers and deserializers supplied by the implicit `Joined` instance.
|
||||
*
|
||||
* @param table the [[KTable]] to be joined with this stream
|
||||
* @param joiner a function that computes the join result for a pair of matching records
|
||||
* @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize
|
||||
* @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize
|
||||
* inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply
|
||||
* key serde, value serde and other value serde in implicit scope and they will be
|
||||
* converted to the instance of `Joined` through implicit conversion
|
||||
* @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
|
||||
* one for each matched record-pair with the same key
|
||||
* one for each matched record-pair with the same key
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#join`
|
||||
*/
|
||||
def join[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
|
||||
|
@ -479,7 +476,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
* @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
|
||||
* one output for each input [[KStream]] record
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#join`
|
||||
*/
|
||||
*/
|
||||
def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])(
|
||||
keyValueMapper: (K, V) => GK,
|
||||
joiner: (V, GV) => RV
|
||||
|
@ -491,20 +488,20 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
)
|
||||
|
||||
/**
|
||||
* Join records of this stream with another [[KStream]]'s records using windowed left equi join with
|
||||
* Join records of this stream with another [[KStream]]'s records using windowed left equi join with
|
||||
* serializers and deserializers supplied by the implicit `Joined` instance.
|
||||
*
|
||||
* @param otherStream the [[KStream]] to be joined with this stream
|
||||
* @param joiner a function that computes the join result for a pair of matching records
|
||||
* @param windows the specification of the `JoinWindows`
|
||||
* @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize
|
||||
* @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize
|
||||
* inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply
|
||||
* key serde, value serde and other value serde in implicit scope and they will be
|
||||
* converted to the instance of `Joined` through implicit conversion
|
||||
* @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
|
||||
* one for each matched record-pair with the same key and within the joining window intervals
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
|
||||
*/
|
||||
*/
|
||||
def leftJoin[VO, VR](otherStream: KStream[K, VO])(
|
||||
joiner: (V, VO) => VR,
|
||||
windows: JoinWindows
|
||||
|
@ -512,19 +509,19 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
|
||||
|
||||
/**
|
||||
* Join records of this stream with another [[KTable]]'s records using left equi join with
|
||||
* Join records of this stream with another [[KTable]]'s records using left equi join with
|
||||
* serializers and deserializers supplied by the implicit `Joined` instance.
|
||||
*
|
||||
* @param table the [[KTable]] to be joined with this stream
|
||||
* @param joiner a function that computes the join result for a pair of matching records
|
||||
* @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize
|
||||
* @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize
|
||||
* inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply
|
||||
* key serde, value serde and other value serde in implicit scope and they will be
|
||||
* converted to the instance of `Joined` through implicit conversion
|
||||
* @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
|
||||
* one for each matched record-pair with the same key
|
||||
* one for each matched record-pair with the same key
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
|
||||
*/
|
||||
*/
|
||||
def leftJoin[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
|
||||
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)
|
||||
|
||||
|
@ -538,7 +535,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
* @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
|
||||
* one output for each input [[KStream]] record
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
|
||||
*/
|
||||
*/
|
||||
def leftJoin[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])(
|
||||
keyValueMapper: (K, V) => GK,
|
||||
joiner: (V, GV) => RV
|
||||
|
@ -546,20 +543,20 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
inner.leftJoin[GK, GV, RV](globalKTable, keyValueMapper.asKeyValueMapper, joiner.asValueJoiner)
|
||||
|
||||
/**
|
||||
* Join records of this stream with another [[KStream]]'s records using windowed outer equi join with
|
||||
* Join records of this stream with another [[KStream]]'s records using windowed outer equi join with
|
||||
* serializers and deserializers supplied by the implicit `Joined` instance.
|
||||
*
|
||||
* @param otherStream the [[KStream]] to be joined with this stream
|
||||
* @param joiner a function that computes the join result for a pair of matching records
|
||||
* @param windows the specification of the `JoinWindows`
|
||||
* @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize
|
||||
* @param joined an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize
|
||||
* inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply
|
||||
* key serde, value serde and other value serde in implicit scope and they will be
|
||||
* converted to the instance of `Joined` through implicit conversion
|
||||
* @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
|
||||
* one for each matched record-pair with the same key and within the joining window intervals
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#outerJoin`
|
||||
*/
|
||||
*/
|
||||
def outerJoin[VO, VR](otherStream: KStream[K, VO])(
|
||||
joiner: (V, VO) => VR,
|
||||
windows: JoinWindows
|
||||
|
@ -569,8 +566,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
/**
|
||||
* Merge this stream and the given stream into one larger stream.
|
||||
* <p>
|
||||
* There is no ordering guarantee between records from this `KStream` and records from the provided `KStream`
|
||||
* in the merged stream. Relative order is preserved within each input stream though (ie, records within
|
||||
* There is no ordering guarantee between records from this `KStream` and records from the provided `KStream`
|
||||
* in the merged stream. Relative order is preserved within each input stream though (ie, records within
|
||||
* one input stream are processed in order).
|
||||
*
|
||||
* @param stream a stream which is to be merged into this stream
|
||||
|
|
|
@ -44,10 +44,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
* @param predicate a filter that is applied to each record
|
||||
* @return a [[KTable]] that contains only those records that satisfy the given predicate
|
||||
* @see `org.apache.kafka.streams.kstream.KTable#filter`
|
||||
*/
|
||||
def filter(predicate: (K, V) => Boolean): KTable[K, V] = {
|
||||
*/
|
||||
def filter(predicate: (K, V) => Boolean): KTable[K, V] =
|
||||
inner.filter(predicate(_, _))
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new [[KTable]] that consists all records of this [[KTable]] which satisfies the given
|
||||
|
@ -55,12 +54,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
*
|
||||
* @param predicate a filter that is applied to each record
|
||||
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
|
||||
* should be materialized.
|
||||
* should be materialized.
|
||||
* @return a [[KTable]] that contains only those records that satisfy the given predicate
|
||||
* @see `org.apache.kafka.streams.kstream.KTable#filter`
|
||||
*/
|
||||
def filter(predicate: (K, V) => Boolean,
|
||||
materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
|
||||
*/
|
||||
def filter(predicate: (K, V) => Boolean, materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
|
||||
inner.filter(predicate.asPredicate, materialized)
|
||||
|
||||
/**
|
||||
|
@ -70,7 +68,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
* @param predicate a filter that is applied to each record
|
||||
* @return a [[KTable]] that contains only those records that do <em>not</em> satisfy the given predicate
|
||||
* @see `org.apache.kafka.streams.kstream.KTable#filterNot`
|
||||
*/
|
||||
*/
|
||||
def filterNot(predicate: (K, V) => Boolean): KTable[K, V] =
|
||||
inner.filterNot(predicate(_, _))
|
||||
|
||||
|
@ -80,12 +78,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
*
|
||||
* @param predicate a filter that is applied to each record
|
||||
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
|
||||
* should be materialized.
|
||||
* should be materialized.
|
||||
* @return a [[KTable]] that contains only those records that do <em>not</em> satisfy the given predicate
|
||||
* @see `org.apache.kafka.streams.kstream.KTable#filterNot`
|
||||
*/
|
||||
def filterNot(predicate: (K, V) => Boolean,
|
||||
materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
|
||||
*/
|
||||
def filterNot(predicate: (K, V) => Boolean, materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
|
||||
inner.filterNot(predicate.asPredicate, materialized)
|
||||
|
||||
/**
|
||||
|
@ -97,7 +94,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
* @param mapper, a function `V => VR` that computes a new output value
|
||||
* @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
|
||||
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
|
||||
*/
|
||||
*/
|
||||
def mapValues[VR](mapper: V => VR): KTable[K, VR] =
|
||||
inner.mapValues[VR](mapper.asValueMapper)
|
||||
|
||||
|
@ -109,12 +106,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
*
|
||||
* @param mapper, a function `V => VR` that computes a new output value
|
||||
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
|
||||
* should be materialized.
|
||||
* should be materialized.
|
||||
* @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
|
||||
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
|
||||
*/
|
||||
def mapValues[VR](mapper: V => VR,
|
||||
materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
|
||||
*/
|
||||
def mapValues[VR](mapper: V => VR, materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
|
||||
inner.mapValues[VR](mapper.asValueMapper, materialized)
|
||||
|
||||
/**
|
||||
|
@ -126,7 +122,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
* @param mapper, a function `(K, V) => VR` that computes a new output value
|
||||
* @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
|
||||
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
|
||||
*/
|
||||
*/
|
||||
def mapValues[VR](mapper: (K, V) => VR): KTable[K, VR] =
|
||||
inner.mapValues[VR](mapper.asValueMapperWithKey)
|
||||
|
||||
|
@ -138,12 +134,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
*
|
||||
* @param mapper, a function `(K, V) => VR` that computes a new output value
|
||||
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
|
||||
* should be materialized.
|
||||
* should be materialized.
|
||||
* @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
|
||||
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
|
||||
*/
|
||||
def mapValues[VR](mapper: (K, V) => VR,
|
||||
materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
|
||||
*/
|
||||
def mapValues[VR](mapper: (K, V) => VR, materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
|
||||
inner.mapValues[VR](mapper.asValueMapperWithKey)
|
||||
|
||||
/**
|
||||
|
@ -165,57 +160,55 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
inner.toStream[KR](mapper.asKeyValueMapper)
|
||||
|
||||
/**
|
||||
* Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type).
|
||||
* Transform the value of each input record into a new value (with possible new type) of the output record.
|
||||
* A `ValueTransformerWithKey` (provided by the given `ValueTransformerWithKeySupplier`) is applied to each input
|
||||
* record value and computes a new value for it.
|
||||
* This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible, allowing access to additional state-stores,
|
||||
* and to the `ProcessorContext`.
|
||||
* If the downstream topology uses aggregation functions, (e.g. `KGroupedTable#reduce`, `KGroupedTable#aggregate`, etc),
|
||||
* care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct
|
||||
* aggregate results.
|
||||
* In contrast, if the resulting KTable is materialized, (cf. `#transformValues(ValueTransformerWithKeySupplier, Materialized, String...)`),
|
||||
* such concerns are handled for you.
|
||||
* In order to assign a state, the state must be created and registered
|
||||
* beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
|
||||
*
|
||||
* @param valueTransformerWithKeySupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`.
|
||||
* At least one transformer instance will be created per streaming task.
|
||||
* Transformer implementations doe not need to be thread-safe.
|
||||
* @param stateStoreNames the names of the state stores used by the processor
|
||||
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
|
||||
*/
|
||||
* Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type).
|
||||
* Transform the value of each input record into a new value (with possible new type) of the output record.
|
||||
* A `ValueTransformerWithKey` (provided by the given `ValueTransformerWithKeySupplier`) is applied to each input
|
||||
* record value and computes a new value for it.
|
||||
* This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible, allowing access to additional state-stores,
|
||||
* and to the `ProcessorContext`.
|
||||
* If the downstream topology uses aggregation functions, (e.g. `KGroupedTable#reduce`, `KGroupedTable#aggregate`, etc),
|
||||
* care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct
|
||||
* aggregate results.
|
||||
* In contrast, if the resulting KTable is materialized, (cf. `#transformValues(ValueTransformerWithKeySupplier, Materialized, String...)`),
|
||||
* such concerns are handled for you.
|
||||
* In order to assign a state, the state must be created and registered
|
||||
* beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
|
||||
*
|
||||
* @param valueTransformerWithKeySupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`.
|
||||
* At least one transformer instance will be created per streaming task.
|
||||
* Transformer implementations doe not need to be thread-safe.
|
||||
* @param stateStoreNames the names of the state stores used by the processor
|
||||
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
|
||||
*/
|
||||
def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
|
||||
stateStoreNames: String*): KTable[K, VR] = {
|
||||
stateStoreNames: String*): KTable[K, VR] =
|
||||
inner.transformValues[VR](valueTransformerWithKeySupplier, stateStoreNames: _*)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type).
|
||||
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
|
||||
* record value and computes a new value for it.
|
||||
* This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible, allowing stateful, rather than stateless,
|
||||
* record-by-record operation, access to additional state-stores, and access to the `ProcessorContext`.
|
||||
* In order to assign a state, the state must be created and registered
|
||||
* beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
|
||||
* The resulting `KTable` is materialized into another state store (additional to the provided state store names)
|
||||
* as specified by the user via `Materialized` parameter, and is queryable through its given name.
|
||||
*
|
||||
* @param valueTransformerWithKeySupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
|
||||
* At least one transformer instance will be created per streaming task.
|
||||
* Transformer implementations doe not need to be thread-safe.
|
||||
* @param materialized an instance of `Materialized` used to describe how the state store of the
|
||||
* resulting table should be materialized.
|
||||
* @param stateStoreNames the names of the state stores used by the processor
|
||||
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
|
||||
*/
|
||||
* Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type).
|
||||
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
|
||||
* record value and computes a new value for it.
|
||||
* This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible, allowing stateful, rather than stateless,
|
||||
* record-by-record operation, access to additional state-stores, and access to the `ProcessorContext`.
|
||||
* In order to assign a state, the state must be created and registered
|
||||
* beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
|
||||
* The resulting `KTable` is materialized into another state store (additional to the provided state store names)
|
||||
* as specified by the user via `Materialized` parameter, and is queryable through its given name.
|
||||
*
|
||||
* @param valueTransformerWithKeySupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
|
||||
* At least one transformer instance will be created per streaming task.
|
||||
* Transformer implementations doe not need to be thread-safe.
|
||||
* @param materialized an instance of `Materialized` used to describe how the state store of the
|
||||
* resulting table should be materialized.
|
||||
* @param stateStoreNames the names of the state stores used by the processor
|
||||
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
|
||||
*/
|
||||
def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
|
||||
materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]],
|
||||
stateStoreNames: String*): KTable[K, VR] = {
|
||||
stateStoreNames: String*): KTable[K, VR] =
|
||||
inner.transformValues[VR](valueTransformerWithKeySupplier, materialized, stateStoreNames: _*)
|
||||
}
|
||||
|
||||
/**
|
||||
* Re-groups the records of this [[KTable]] using the provided key/value mapper
|
||||
|
@ -247,7 +240,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
* @param other the other [[KTable]] to be joined with this [[KTable]]
|
||||
* @param joiner a function that computes the join result for a pair of matching records
|
||||
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
|
||||
* should be materialized.
|
||||
* should be materialized.
|
||||
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
|
||||
* one for each matched record-pair with the same key
|
||||
* @see `org.apache.kafka.streams.kstream.KTable#join`
|
||||
|
@ -276,7 +269,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
* @param other the other [[KTable]] to be joined with this [[KTable]]
|
||||
* @param joiner a function that computes the join result for a pair of matching records
|
||||
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
|
||||
* should be materialized.
|
||||
* should be materialized.
|
||||
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
|
||||
* one for each matched record-pair with the same key
|
||||
* @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
|
||||
|
@ -305,11 +298,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
* @param other the other [[KTable]] to be joined with this [[KTable]]
|
||||
* @param joiner a function that computes the join result for a pair of matching records
|
||||
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
|
||||
* should be materialized.
|
||||
* should be materialized.
|
||||
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
|
||||
* one for each matched record-pair with the same key
|
||||
* @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
|
||||
*/
|
||||
*/
|
||||
def outerJoin[VO, VR](other: KTable[K, VO])(
|
||||
joiner: (V, VO) => VR,
|
||||
materialized: Materialized[K, VR, ByteArrayKeyValueStore]
|
||||
|
@ -323,4 +316,3 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
|||
*/
|
||||
def queryableStoreName: String = inner.queryableStoreName
|
||||
}
|
||||
|
||||
|
|
|
@ -46,8 +46,7 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
|
|||
* the latest (rolling) aggregate for each key within a window
|
||||
* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
|
||||
*/
|
||||
def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR,
|
||||
merger: (K, VR, VR) => VR)(
|
||||
def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR, merger: (K, VR, VR) => VR)(
|
||||
implicit materialized: Materialized[K, VR, ByteArraySessionStore]
|
||||
): KTable[Windowed[K], VR] =
|
||||
inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger, materialized)
|
||||
|
@ -55,7 +54,7 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
|
|||
/**
|
||||
* Count the number of records in this stream by the grouped key into `SessionWindows`.
|
||||
*
|
||||
* @param materialized an instance of `Materialized` used to materialize a state store.
|
||||
* @param materialized an instance of `Materialized` used to materialize a state store.
|
||||
* @return a windowed [[KTable]] that contains "update" records with unmodified keys and `Long` values
|
||||
* that represent the latest (rolling) count (i.e., number of records) for each key within a window
|
||||
* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count`
|
||||
|
@ -69,8 +68,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
|
|||
/**
|
||||
* Combine values of this stream by the grouped key into {@link SessionWindows}.
|
||||
*
|
||||
* @param reducer a reducer function that computes a new aggregate result.
|
||||
* @param materialized an instance of `Materialized` used to materialize a state store.
|
||||
* @param reducer a reducer function that computes a new aggregate result.
|
||||
* @param materialized an instance of `Materialized` used to materialize a state store.
|
||||
* @return a windowed [[KTable]] that contains "update" records with unmodified keys, and values that represent
|
||||
* the latest (rolling) aggregate for each key within a window
|
||||
* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce`
|
||||
|
|
|
@ -53,11 +53,11 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
|
|||
/**
|
||||
* Count the number of records in this stream by the grouped key and the defined windows.
|
||||
*
|
||||
* @param materialized an instance of `Materialized` used to materialize a state store.
|
||||
* @param materialized an instance of `Materialized` used to materialize a state store.
|
||||
* @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
|
||||
* represent the latest (rolling) count (i.e., number of records) for each key
|
||||
* @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count`
|
||||
*/
|
||||
*/
|
||||
def count()(implicit materialized: Materialized[K, Long, ByteArrayWindowStore]): KTable[Windowed[K], Long] = {
|
||||
val c: KTable[Windowed[K], java.lang.Long] =
|
||||
inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayWindowStore]])
|
||||
|
@ -68,7 +68,7 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
|
|||
* Combine the values of records in this stream by the grouped key.
|
||||
*
|
||||
* @param reducer a function that computes a new aggregate result
|
||||
* @param materialized an instance of `Materialized` used to materialize a state store.
|
||||
* @param materialized an instance of `Materialized` used to materialize a state store.
|
||||
* @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
|
||||
* latest (rolling) aggregate for each key
|
||||
* @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce`
|
||||
|
|
|
@ -40,9 +40,8 @@ import org.scalatest.junit.JUnitSuite
|
|||
* <p>
|
||||
* Note: In the current project settings SAM type conversion is turned off as it's experimental in Scala 2.11.
|
||||
* Hence the native Java API based version is more verbose.
|
||||
*/
|
||||
class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
|
||||
with StreamToTableJoinTestData {
|
||||
*/
|
||||
class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite with StreamToTableJoinTestData {
|
||||
|
||||
private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)
|
||||
|
||||
|
@ -67,7 +66,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
|
|||
|
||||
@Test def testShouldCountClicksPerRegion(): Unit = {
|
||||
|
||||
// DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Serialized, Produced,
|
||||
// DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Serialized, Produced,
|
||||
// Consumed and Joined instances. So all APIs below that accept Serialized, Produced, Consumed or Joined will
|
||||
// get these instances automatically
|
||||
import Serdes._
|
||||
|
@ -84,7 +83,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
|
|||
val clicksPerRegion: KTable[String, Long] =
|
||||
userClicksStream
|
||||
|
||||
// Join the stream against the table.
|
||||
// Join the stream against the table.
|
||||
.leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
|
||||
|
||||
// Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
|
||||
|
@ -100,8 +99,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
|
|||
val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration)
|
||||
streams.start()
|
||||
|
||||
|
||||
val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
|
||||
val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
|
||||
produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
|
||||
|
||||
streams.close()
|
||||
|
@ -126,29 +124,32 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
|
|||
|
||||
val builder: StreamsBuilderJ = new StreamsBuilderJ()
|
||||
|
||||
val userClicksStream: KStreamJ[String, JLong] =
|
||||
val userClicksStream: KStreamJ[String, JLong] =
|
||||
builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(Serdes.String, Serdes.JavaLong))
|
||||
|
||||
val userRegionsTable: KTableJ[String, String] =
|
||||
val userRegionsTable: KTableJ[String, String] =
|
||||
builder.table[String, String](userRegionsTopicJ, Consumed.`with`(Serdes.String, Serdes.String))
|
||||
|
||||
// Join the stream against the table.
|
||||
val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream
|
||||
.leftJoin(userRegionsTable,
|
||||
.leftJoin(
|
||||
userRegionsTable,
|
||||
new ValueJoiner[JLong, String, (String, JLong)] {
|
||||
def apply(clicks: JLong, region: String): (String, JLong) =
|
||||
def apply(clicks: JLong, region: String): (String, JLong) =
|
||||
(if (region == null) "UNKNOWN" else region, clicks)
|
||||
},
|
||||
Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String))
|
||||
},
|
||||
Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String)
|
||||
)
|
||||
|
||||
// Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
|
||||
val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion
|
||||
.map {
|
||||
val clicksByRegion: KStreamJ[String, JLong] = userClicksJoinRegion
|
||||
.map {
|
||||
new KeyValueMapper[String, (String, JLong), KeyValue[String, JLong]] {
|
||||
def apply(k: String, regionWithClicks: (String, JLong)) = new KeyValue[String, JLong](regionWithClicks._1, regionWithClicks._2)
|
||||
def apply(k: String, regionWithClicks: (String, JLong)) =
|
||||
new KeyValue[String, JLong](regionWithClicks._1, regionWithClicks._2)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Compute the total per region by summing the individual click counts per region.
|
||||
val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
|
||||
.groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong))
|
||||
|
@ -157,7 +158,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
|
|||
def apply(v1: JLong, v2: JLong) = v1 + v2
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Write the (continuously updating) results to the output topic.
|
||||
clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(Serdes.String, Serdes.JavaLong))
|
||||
|
||||
|
@ -165,7 +166,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
|
|||
|
||||
streams.start()
|
||||
|
||||
val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
|
||||
val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
|
||||
produceNConsume(userClicksTopicJ, userRegionsTopicJ, outputTopicJ)
|
||||
|
||||
streams.close()
|
||||
|
@ -214,17 +215,27 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
|
|||
p
|
||||
}
|
||||
|
||||
private def produceNConsume(userClicksTopic: String, userRegionsTopic: String, outputTopic: String): java.util.List[KeyValue[String, Long]] = {
|
||||
private def produceNConsume(userClicksTopic: String,
|
||||
userRegionsTopic: String,
|
||||
outputTopic: String): java.util.List[KeyValue[String, Long]] = {
|
||||
|
||||
import collection.JavaConverters._
|
||||
|
||||
|
||||
// Publish user-region information.
|
||||
val userRegionsProducerConfig: Properties = getUserRegionsProducerConfig()
|
||||
IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic, userRegions.asJava, userRegionsProducerConfig, mockTime, false)
|
||||
IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic,
|
||||
userRegions.asJava,
|
||||
userRegionsProducerConfig,
|
||||
mockTime,
|
||||
false)
|
||||
|
||||
// Publish user-click information.
|
||||
val userClicksProducerConfig: Properties = getUserClicksProducerConfig()
|
||||
IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic, userClicks.asJava, userClicksProducerConfig, mockTime, false)
|
||||
IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic,
|
||||
userClicks.asJava,
|
||||
userClicksProducerConfig,
|
||||
mockTime,
|
||||
false)
|
||||
|
||||
// consume and verify result
|
||||
val consumerConfig = getConsumerConfig()
|
||||
|
@ -232,4 +243,3 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
|
|||
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedClicksPerRegion.size)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -58,4 +58,3 @@ trait StreamToTableJoinTestData {
|
|||
new KeyValue("asia", 124L)
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ import collection.JavaConverters._
|
|||
|
||||
/**
|
||||
* Test suite that verifies that the topology built by the Java and Scala APIs match.
|
||||
*/
|
||||
*/
|
||||
class TopologyTest extends JUnitSuite {
|
||||
|
||||
val inputTopic = "input-topic"
|
||||
|
@ -50,22 +50,22 @@ class TopologyTest extends JUnitSuite {
|
|||
def getTopologyScala(): TopologyDescription = {
|
||||
|
||||
import Serdes._
|
||||
|
||||
|
||||
val streamBuilder = new StreamsBuilder
|
||||
val textLines = streamBuilder.stream[String, String](inputTopic)
|
||||
|
||||
|
||||
val _: KStream[String, String] =
|
||||
textLines.flatMapValues(v => pattern.split(v.toLowerCase))
|
||||
|
||||
|
||||
streamBuilder.build().describe()
|
||||
}
|
||||
|
||||
|
||||
// build the Java topology
|
||||
def getTopologyJava(): TopologyDescription = {
|
||||
|
||||
val streamBuilder = new StreamsBuilderJ
|
||||
val textLines = streamBuilder.stream[String, String](inputTopic)
|
||||
|
||||
|
||||
val _: KStreamJ[String, String] = textLines.flatMapValues {
|
||||
new ValueMapper[String, java.lang.Iterable[String]] {
|
||||
def apply(s: String): java.lang.Iterable[String] = pattern.split(s.toLowerCase).toIterable.asJava
|
||||
|
@ -84,15 +84,16 @@ class TopologyTest extends JUnitSuite {
|
|||
def getTopologyScala(): TopologyDescription = {
|
||||
|
||||
import Serdes._
|
||||
|
||||
|
||||
val streamBuilder = new StreamsBuilder
|
||||
val textLines = streamBuilder.stream[String, String](inputTopic)
|
||||
|
||||
|
||||
val _: KTable[String, Long] =
|
||||
textLines.flatMapValues(v => pattern.split(v.toLowerCase))
|
||||
textLines
|
||||
.flatMapValues(v => pattern.split(v.toLowerCase))
|
||||
.groupBy((k, v) => v)
|
||||
.count()
|
||||
|
||||
|
||||
streamBuilder.build().describe()
|
||||
}
|
||||
|
||||
|
@ -101,21 +102,21 @@ class TopologyTest extends JUnitSuite {
|
|||
|
||||
val streamBuilder = new StreamsBuilderJ
|
||||
val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopic)
|
||||
|
||||
|
||||
val splits: KStreamJ[String, String] = textLines.flatMapValues {
|
||||
new ValueMapper[String, java.lang.Iterable[String]] {
|
||||
def apply(s: String): java.lang.Iterable[String] = pattern.split(s.toLowerCase).toIterable.asJava
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
val grouped: KGroupedStreamJ[String, String] = splits.groupBy {
|
||||
new KeyValueMapper[String, String, String] {
|
||||
def apply(k: String, v: String): String = v
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
|
||||
|
||||
|
||||
streamBuilder.build().describe()
|
||||
}
|
||||
|
||||
|
@ -128,13 +129,13 @@ class TopologyTest extends JUnitSuite {
|
|||
// build the Scala topology
|
||||
def getTopologyScala(): TopologyDescription = {
|
||||
import Serdes._
|
||||
|
||||
|
||||
val builder = new StreamsBuilder()
|
||||
|
||||
|
||||
val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic)
|
||||
|
||||
|
||||
val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic)
|
||||
|
||||
|
||||
val clicksPerRegion: KTable[String, Long] =
|
||||
userClicksStream
|
||||
.leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
|
||||
|
@ -149,32 +150,35 @@ class TopologyTest extends JUnitSuite {
|
|||
def getTopologyJava(): TopologyDescription = {
|
||||
|
||||
import java.lang.{Long => JLong}
|
||||
|
||||
|
||||
val builder: StreamsBuilderJ = new StreamsBuilderJ()
|
||||
|
||||
val userClicksStream: KStreamJ[String, JLong] =
|
||||
|
||||
val userClicksStream: KStreamJ[String, JLong] =
|
||||
builder.stream[String, JLong](userClicksTopic, Consumed.`with`(Serdes.String, Serdes.JavaLong))
|
||||
|
||||
val userRegionsTable: KTableJ[String, String] =
|
||||
|
||||
val userRegionsTable: KTableJ[String, String] =
|
||||
builder.table[String, String](userRegionsTopic, Consumed.`with`(Serdes.String, Serdes.String))
|
||||
|
||||
|
||||
// Join the stream against the table.
|
||||
val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream
|
||||
.leftJoin(userRegionsTable,
|
||||
.leftJoin(
|
||||
userRegionsTable,
|
||||
new ValueJoiner[JLong, String, (String, JLong)] {
|
||||
def apply(clicks: JLong, region: String): (String, JLong) =
|
||||
def apply(clicks: JLong, region: String): (String, JLong) =
|
||||
(if (region == null) "UNKNOWN" else region, clicks)
|
||||
},
|
||||
Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String))
|
||||
|
||||
},
|
||||
Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String)
|
||||
)
|
||||
|
||||
// Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
|
||||
val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion
|
||||
.map {
|
||||
val clicksByRegion: KStreamJ[String, JLong] = userClicksJoinRegion
|
||||
.map {
|
||||
new KeyValueMapper[String, (String, JLong), KeyValue[String, JLong]] {
|
||||
def apply(k: String, regionWithClicks: (String, JLong)) = new KeyValue[String, JLong](regionWithClicks._1, regionWithClicks._2)
|
||||
def apply(k: String, regionWithClicks: (String, JLong)) =
|
||||
new KeyValue[String, JLong](regionWithClicks._1, regionWithClicks._2)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Compute the total per region by summing the individual click counts per region.
|
||||
val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
|
||||
.groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong))
|
||||
|
|
|
@ -50,7 +50,7 @@ import ImplicitConversions._
|
|||
* <p>
|
||||
* Note: In the current project settings SAM type conversion is turned off as it's experimental in Scala 2.11.
|
||||
* Hence the native Java API based version is more verbose.
|
||||
*/
|
||||
*/
|
||||
class WordCountTest extends JUnitSuite with WordCountTestData {
|
||||
|
||||
private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)
|
||||
|
@ -61,11 +61,8 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
|
|||
val mockTime: MockTime = cluster.time
|
||||
mockTime.setCurrentTimeMs(alignedTime)
|
||||
|
||||
|
||||
val tFolder: TemporaryFolder = new TemporaryFolder(TestUtils.tempDirectory())
|
||||
@Rule def testFolder: TemporaryFolder = tFolder
|
||||
|
||||
|
||||
@Before
|
||||
def startKafkaCluster(): Unit = {
|
||||
cluster.createTopic(inputTopic)
|
||||
|
@ -86,7 +83,8 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
|
|||
|
||||
// generate word counts
|
||||
val wordCounts: KTable[String, Long] =
|
||||
textLines.flatMapValues(v => pattern.split(v.toLowerCase))
|
||||
textLines
|
||||
.flatMapValues(v => pattern.split(v.toLowerCase))
|
||||
.groupBy((_, v) => v)
|
||||
.count()
|
||||
|
||||
|
@ -117,7 +115,8 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
|
|||
|
||||
// generate word counts
|
||||
val wordCounts: KTable[String, Long] =
|
||||
textLines.flatMapValues(v => pattern.split(v.toLowerCase))
|
||||
textLines
|
||||
.flatMapValues(v => pattern.split(v.toLowerCase))
|
||||
.groupBy((k, v) => v)
|
||||
.count()(Materialized.as("word-count"))
|
||||
|
||||
|
@ -139,7 +138,12 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
|
|||
@Test def testShouldCountWordsJava(): Unit = {
|
||||
|
||||
import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ}
|
||||
import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => KStreamJ, KGroupedStream => KGroupedStreamJ, _}
|
||||
import org.apache.kafka.streams.kstream.{
|
||||
KTable => KTableJ,
|
||||
KStream => KStreamJ,
|
||||
KGroupedStream => KGroupedStreamJ,
|
||||
_
|
||||
}
|
||||
import collection.JavaConverters._
|
||||
|
||||
val streamsConfiguration = getStreamsConfiguration()
|
||||
|
@ -250,4 +254,3 @@ trait WordCountTestData {
|
|||
new KeyValue("слова", 1L)
|
||||
)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue