KAFKA-7250: switch scala transform to TransformSupplier (#5481)

#5468 introduced a breaking API change that was actually avoidable. This PR re-introduces the old API as deprecated and alters the API introduced by #5468 to be consistent with the other methods

also, fixed misc syntax problems
This commit is contained in:
John Roesler 2018-08-09 12:11:48 -05:00 committed by Guozhang Wang
parent 13a7544418
commit b1539ff62d
3 changed files with 31 additions and 28 deletions

View File

@ -1015,6 +1015,7 @@ project(':streams:streams-scala') {
testCompile libs.junit
testCompile libs.scalatest
testCompile libs.easymock
testRuntime libs.slf4jlog4j
}

View File

@ -22,7 +22,7 @@ package kstream
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _}
import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier, TopicNameExtractor}
import org.apache.kafka.streams.processor.{Processor, ProcessorContext, ProcessorSupplier, TopicNameExtractor}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._
@ -31,8 +31,8 @@ import scala.collection.JavaConverters._
/**
* Wraps the Java class [[org.apache.kafka.streams.kstream.KStream]] and delegates method calls to the underlying Java object.
*
* @param [K] Type of keys
* @param [V] Type of values
* @tparam K Type of keys
* @tparam V Type of values
* @param inner The underlying Java abstraction for KStream
*
* @see `org.apache.kafka.streams.kstream.KStream`
@ -167,7 +167,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
def print(printed: Printed[K, V]): Unit = inner.print(printed)
/**
* Perform an action on each record of 'KStream`
* Perform an action on each record of `KStream`
*
* @param action an action to perform on each record
* @see `org.apache.kafka.streams.kstream.KStream#foreach`
@ -176,14 +176,15 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
inner.foreach((k: K, v: V) => action(k, v))
/**
* Creates an array of {@code KStream} from this stream by branching the records in the original stream based on
* Creates an array of `KStream` from this stream by branching the records in the original stream based on
* the supplied predicates.
*
* @param predicates the ordered list of functions that return a Boolean
* @return multiple distinct substreams of this [[KStream]]
* @see `org.apache.kafka.streams.kstream.KStream#branch`
*/
def branch(predicates: (K, V) => Boolean*): Array[KStream[K, V]] =
//noinspection ScalaUnnecessaryParentheses
def branch(predicates: ((K, V) => Boolean)*): Array[KStream[K, V]] =
inner.branch(predicates.map(_.asPredicate): _*).map(kstream => wrapKStream(kstream))
/**
@ -211,7 +212,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* }}}
*
* @param topic the topic name
* @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner`
* @param produced the instance of Produced that gives the serdes and `StreamPartitioner`
* @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]]
* @see `org.apache.kafka.streams.kstream.KStream#through`
*/
@ -243,7 +244,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* }}}
*
* @param topic the topic name
* @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner`
* @param produced the instance of Produced that gives the serdes and `StreamPartitioner`
* @see `org.apache.kafka.streams.kstream.KStream#to`
*/
def to(topic: String)(implicit produced: Produced[K, V]): Unit =
@ -275,7 +276,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* }}}
*
* @param extractor the extractor to determine the name of the Kafka topic to write to for reach record
* @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner`
* @param produced the instance of Produced that gives the serdes and `StreamPartitioner`
* @see `org.apache.kafka.streams.kstream.KStream#to`
*/
def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, V]): Unit =
@ -295,9 +296,9 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transform`
*/
def transform[K1, V1](transformerSupplier: () => Transformer[K, V, KeyValue[K1, V1]],
def transform[K1, V1](transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]],
stateStoreNames: String*): KStream[K1, V1] =
inner.transform(transformerSupplier.asTransformerSupplier, stateStoreNames: _*)
inner.transform(transformerSupplier, stateStoreNames: _*)
/**
* Transform the value of each input record into a new value (with possible new type) of the output record.
@ -337,11 +338,12 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* In order to assign a state, the state must be created and registered
* beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
*
* @param processorSupplier a function that generates a [[org.apache.kafka.stream.Processor]]
* @param processorSupplier a function that generates a [[org.apache.kafka.streams.processor.Processor]]
* @param stateStoreNames the names of the state store used by the processor
* @see `org.apache.kafka.streams.kstream.KStream#process`
*/
def process(processorSupplier: () => Processor[K, V], stateStoreNames: String*): Unit = {
//noinspection ConvertExpressionToSAM // because of the 2.11 build
val processorSupplierJ: ProcessorSupplier[K, V] = new ProcessorSupplier[K, V] {
override def get(): Processor[K, V] = processorSupplier()
}
@ -374,7 +376,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* // to the groupByKey call
* }}}
*
* @param (implicit) serialized the instance of Serialized that gives the serdes
* @param serialized the instance of Serialized that gives the serdes
* @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]]
* @see `org.apache.kafka.streams.kstream.KStream#groupByKey`
*/
@ -564,7 +566,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
def merge(stream: KStream[K, V]): KStream[K, V] = inner.merge(stream.inner)
/**
* Perform an action on each record of {@code KStream}.
* Perform an action on each record of `KStream`.
* <p>
* Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection)
* and returns an unchanged stream.

View File

@ -21,19 +21,16 @@ package org.apache.kafka.streams.scala
import java.util.regex.Pattern
import org.scalatest.junit.JUnitSuite
import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, KStream => KStreamJ, KTable => KTableJ, _}
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, _}
import org.junit.Assert._
import org.junit._
import org.scalatest.junit.JUnitSuite
import org.apache.kafka.streams.scala.kstream._
import ImplicitConversions._
import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, _}
import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => KStreamJ, KGroupedStream => KGroupedStreamJ, _}
import org.apache.kafka.streams.processor.ProcessorContext
import collection.JavaConverters._
import _root_.scala.collection.JavaConverters._
/**
* Test suite that verifies that the topology built by the Java and Scala APIs match.
@ -207,17 +204,20 @@ class TopologyTest extends JUnitSuite {
val streamBuilder = new StreamsBuilder
val textLines = streamBuilder.stream[String, String](inputTopic)
//noinspection ConvertExpressionToSAM due to 2.11 build
val _: KTable[String, Long] =
textLines
.transform(
() =>
.transform(new TransformerSupplier[String, String, KeyValue[String, String]] {
override def get(): Transformer[String, String, KeyValue[String, String]] =
new Transformer[String, String, KeyValue[String, String]] {
override def init(context: ProcessorContext): Unit = Unit
override def transform(key: String, value: String): KeyValue[String, String] =
new KeyValue(key, value.toLowerCase)
override def close(): Unit = Unit
}
)
})
.groupBy((k, v) => v)
.count()