KAFKA-7399: KIP-366, Make FunctionConversions deprecated (#5562)

Reviewers: John Roesler <vvcephei@users.noreply.github.com>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Joan Goyeau 2018-09-19 17:10:15 +01:00 committed by Guozhang Wang
parent 88823c6016
commit af80dccc7d
9 changed files with 130 additions and 14 deletions

View File

@ -284,6 +284,12 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="EQ_UNUSUAL"/>
</Match>
<Match>
<Package name="org.apache.kafka.streams.scala"/>
<Source name="FunctionsCompatConversions.scala"/>
<Bug pattern="EQ_UNUSUAL"/>
</Match>
- <Match>
<Package name="org.apache.kafka.streams.scala"/>
<Or>

View File

@ -24,14 +24,7 @@ import org.apache.kafka.streams.kstream._
import scala.collection.JavaConverters._
import java.lang.{Iterable => JIterable}
/**
* Implicit classes that offer conversions of Scala function literals to
* SAM (Single Abstract Method) objects in Java. These make the Scala APIs much
* more expressive, with less boilerplate and more succinct.
* <p>
* For Scala 2.11, most of these conversions need to be invoked explicitly, as Scala 2.11 does not
* have full support for SAM types.
*/
@deprecated("This object is for internal use only", since = "2.1.0")
object FunctionConversions {
implicit private[scala] class ForeachActionFromFunction[K, V](val p: (K, V) => Unit) extends AnyVal {

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.scala
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream._
import scala.collection.JavaConverters._
import java.lang.{Iterable => JIterable}
/**
* Implicit classes that offer conversions of Scala function literals to
* SAM (Single Abstract Method) objects in Java. These make the Scala APIs much
* more expressive, with less boilerplate and more succinct.
* <p>
* For Scala 2.11, most of these conversions need to be invoked explicitly, as Scala 2.11 does not
* have full support for SAM types.
*/
private[scala] object FunctionsCompatConversions {
implicit class ForeachActionFromFunction[K, V](val p: (K, V) => Unit) extends AnyVal {
def asForeachAction: ForeachAction[K, V] = new ForeachAction[K, V] {
override def apply(key: K, value: V): Unit = p(key, value)
}
}
implicit class PredicateFromFunction[K, V](val p: (K, V) => Boolean) extends AnyVal {
def asPredicate: Predicate[K, V] = new Predicate[K, V] {
override def test(key: K, value: V): Boolean = p(key, value)
}
}
implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends AnyVal {
def asKeyValueMapper: KeyValueMapper[T, U, VR] = new KeyValueMapper[T, U, VR] {
override def apply(key: T, value: U): VR = f(key, value)
}
def asValueJoiner: ValueJoiner[T, U, VR] = new ValueJoiner[T, U, VR] {
override def apply(value1: T, value2: U): VR = f(value1, value2)
}
}
implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f: (K, V) => (KR, VR)) extends AnyVal {
def asKeyValueMapper: KeyValueMapper[K, V, KeyValue[KR, VR]] = new KeyValueMapper[K, V, KeyValue[KR, VR]] {
override def apply(key: K, value: V): KeyValue[KR, VR] = {
val (kr, vr) = f(key, value)
KeyValue.pair(kr, vr)
}
}
}
implicit class ValueMapperFromFunction[V, VR](val f: V => VR) extends AnyVal {
def asValueMapper: ValueMapper[V, VR] = new ValueMapper[V, VR] {
override def apply(value: V): VR = f(value)
}
}
implicit class FlatValueMapperFromFunction[V, VR](val f: V => Iterable[VR]) extends AnyVal {
def asValueMapper: ValueMapper[V, JIterable[VR]] = new ValueMapper[V, JIterable[VR]] {
override def apply(value: V): JIterable[VR] = f(value).asJava
}
}
implicit class ValueMapperWithKeyFromFunction[K, V, VR](val f: (K, V) => VR) extends AnyVal {
def asValueMapperWithKey: ValueMapperWithKey[K, V, VR] = new ValueMapperWithKey[K, V, VR] {
override def apply(readOnlyKey: K, value: V): VR = f(readOnlyKey, value)
}
}
implicit class FlatValueMapperWithKeyFromFunction[K, V, VR](val f: (K, V) => Iterable[VR]) extends AnyVal {
def asValueMapperWithKey: ValueMapperWithKey[K, V, JIterable[VR]] = new ValueMapperWithKey[K, V, JIterable[VR]] {
override def apply(readOnlyKey: K, value: V): JIterable[VR] = f(readOnlyKey, value).asJava
}
}
implicit class AggregatorFromFunction[K, V, VA](val f: (K, V, VA) => VA) extends AnyVal {
def asAggregator: Aggregator[K, V, VA] = new Aggregator[K, V, VA] {
override def apply(key: K, value: V, aggregate: VA): VA = f(key, value, aggregate)
}
}
implicit class MergerFromFunction[K, VR](val f: (K, VR, VR) => VR) extends AnyVal {
def asMerger: Merger[K, VR] = new Merger[K, VR] {
override def apply(aggKey: K, aggOne: VR, aggTwo: VR): VR = f(aggKey, aggOne, aggTwo)
}
}
implicit class ReducerFromFunction[V](val f: (V, V) => V) extends AnyVal {
def asReducer: Reducer[V] = new Reducer[V] {
override def apply(value1: V, value2: V): V = f(value1, value2)
}
}
implicit class InitializerFromFunction[VA](val f: () => VA) extends AnyVal {
def asInitializer: Initializer[VA] = new Initializer[VA] {
override def apply(): VA = f()
}
}
implicit class TransformerSupplierFromFunction[K, V, VO](val f: () => Transformer[K, V, VO]) extends AnyVal {
def asTransformerSupplier: TransformerSupplier[K, V, VO] = new TransformerSupplier[K, V, VO] {
override def get(): Transformer[K, V, VO] = f()
}
}
}

View File

@ -22,7 +22,7 @@ package kstream
import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, _}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._
import org.apache.kafka.streams.scala.FunctionsCompatConversions._
/**
* Wraps the Java class KGroupedStream and delegates method calls to the underlying Java object.

View File

@ -22,7 +22,7 @@ package kstream
import org.apache.kafka.streams.kstream.{KGroupedTable => KGroupedTableJ, _}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._
import org.apache.kafka.streams.scala.FunctionsCompatConversions._
/**
* Wraps the Java class KGroupedTable and delegates method calls to the underlying Java object.

View File

@ -24,7 +24,7 @@ import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _}
import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier, TopicNameExtractor}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._
import org.apache.kafka.streams.scala.FunctionsCompatConversions._
import scala.collection.JavaConverters._

View File

@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.kstream.{KTable => KTableJ, _}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._
import org.apache.kafka.streams.scala.FunctionsCompatConversions._
import org.apache.kafka.streams.state.KeyValueStore
/**

View File

@ -22,7 +22,7 @@ package kstream
import org.apache.kafka.streams.kstream.{SessionWindowedKStream => SessionWindowedKStreamJ, _}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._
import org.apache.kafka.streams.scala.FunctionsCompatConversions._
/**
* Wraps the Java class SessionWindowedKStream and delegates method calls to the underlying Java object.

View File

@ -22,7 +22,7 @@ package kstream
import org.apache.kafka.streams.kstream.{TimeWindowedKStream => TimeWindowedKStreamJ, _}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._
import org.apache.kafka.streams.scala.FunctionsCompatConversions._
/**
* Wraps the Java class TimeWindowedKStream and delegates method calls to the underlying Java object.