From af80dccc7dfb18c41d17d590a316cb6803294cbb Mon Sep 17 00:00:00 2001 From: Joan Goyeau Date: Wed, 19 Sep 2018 17:10:15 +0100 Subject: [PATCH] KAFKA-7399: KIP-366, Make FunctionConversions deprecated (#5562) Reviewers: John Roesler , Guozhang Wang --- gradle/spotbugs-exclude.xml | 6 + .../streams/scala/FunctionConversions.scala | 9 +- .../scala/FunctionsCompatConversions.scala | 117 ++++++++++++++++++ .../scala/kstream/KGroupedStream.scala | 2 +- .../streams/scala/kstream/KGroupedTable.scala | 2 +- .../kafka/streams/scala/kstream/KStream.scala | 2 +- .../kafka/streams/scala/kstream/KTable.scala | 2 +- .../kstream/SessionWindowedKStream.scala | 2 +- .../scala/kstream/TimeWindowedKStream.scala | 2 +- 9 files changed, 130 insertions(+), 14 deletions(-) create mode 100644 streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 99a657e63b5..d83c4c4375f 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -284,6 +284,12 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read + + + + + + - diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala index 566ba22e615..26ba8004334 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala @@ -24,14 +24,7 @@ import org.apache.kafka.streams.kstream._ import scala.collection.JavaConverters._ import java.lang.{Iterable => JIterable} -/** - * Implicit classes that offer conversions of Scala function literals to - * SAM (Single Abstract Method) objects in Java. These make the Scala APIs much - * more expressive, with less boilerplate and more succinct. - *

- * For Scala 2.11, most of these conversions need to be invoked explicitly, as Scala 2.11 does not - * have full support for SAM types. - */ +@deprecated("This object is for internal use only", since = "2.1.0") object FunctionConversions { implicit private[scala] class ForeachActionFromFunction[K, V](val p: (K, V) => Unit) extends AnyVal { diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala new file mode 100644 index 00000000000..754f1cc1aad --- /dev/null +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.scala + +import org.apache.kafka.streams.KeyValue +import org.apache.kafka.streams.kstream._ +import scala.collection.JavaConverters._ +import java.lang.{Iterable => JIterable} + +/** + * Implicit classes that offer conversions of Scala function literals to + * SAM (Single Abstract Method) objects in Java. These make the Scala APIs much + * more expressive, with less boilerplate and more succinct. + *

+ * For Scala 2.11, most of these conversions need to be invoked explicitly, as Scala 2.11 does not + * have full support for SAM types. + */ +private[scala] object FunctionsCompatConversions { + + implicit class ForeachActionFromFunction[K, V](val p: (K, V) => Unit) extends AnyVal { + def asForeachAction: ForeachAction[K, V] = new ForeachAction[K, V] { + override def apply(key: K, value: V): Unit = p(key, value) + } + } + + implicit class PredicateFromFunction[K, V](val p: (K, V) => Boolean) extends AnyVal { + def asPredicate: Predicate[K, V] = new Predicate[K, V] { + override def test(key: K, value: V): Boolean = p(key, value) + } + } + + implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends AnyVal { + def asKeyValueMapper: KeyValueMapper[T, U, VR] = new KeyValueMapper[T, U, VR] { + override def apply(key: T, value: U): VR = f(key, value) + } + def asValueJoiner: ValueJoiner[T, U, VR] = new ValueJoiner[T, U, VR] { + override def apply(value1: T, value2: U): VR = f(value1, value2) + } + } + + implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f: (K, V) => (KR, VR)) extends AnyVal { + def asKeyValueMapper: KeyValueMapper[K, V, KeyValue[KR, VR]] = new KeyValueMapper[K, V, KeyValue[KR, VR]] { + override def apply(key: K, value: V): KeyValue[KR, VR] = { + val (kr, vr) = f(key, value) + KeyValue.pair(kr, vr) + } + } + } + + implicit class ValueMapperFromFunction[V, VR](val f: V => VR) extends AnyVal { + def asValueMapper: ValueMapper[V, VR] = new ValueMapper[V, VR] { + override def apply(value: V): VR = f(value) + } + } + + implicit class FlatValueMapperFromFunction[V, VR](val f: V => Iterable[VR]) extends AnyVal { + def asValueMapper: ValueMapper[V, JIterable[VR]] = new ValueMapper[V, JIterable[VR]] { + override def apply(value: V): JIterable[VR] = f(value).asJava + } + } + + implicit class ValueMapperWithKeyFromFunction[K, V, VR](val f: (K, V) => VR) extends AnyVal { + def asValueMapperWithKey: ValueMapperWithKey[K, V, VR] = new ValueMapperWithKey[K, V, VR] { + override def apply(readOnlyKey: K, value: V): VR = f(readOnlyKey, value) + } + } + + implicit class FlatValueMapperWithKeyFromFunction[K, V, VR](val f: (K, V) => Iterable[VR]) extends AnyVal { + def asValueMapperWithKey: ValueMapperWithKey[K, V, JIterable[VR]] = new ValueMapperWithKey[K, V, JIterable[VR]] { + override def apply(readOnlyKey: K, value: V): JIterable[VR] = f(readOnlyKey, value).asJava + } + } + + implicit class AggregatorFromFunction[K, V, VA](val f: (K, V, VA) => VA) extends AnyVal { + def asAggregator: Aggregator[K, V, VA] = new Aggregator[K, V, VA] { + override def apply(key: K, value: V, aggregate: VA): VA = f(key, value, aggregate) + } + } + + implicit class MergerFromFunction[K, VR](val f: (K, VR, VR) => VR) extends AnyVal { + def asMerger: Merger[K, VR] = new Merger[K, VR] { + override def apply(aggKey: K, aggOne: VR, aggTwo: VR): VR = f(aggKey, aggOne, aggTwo) + } + } + + implicit class ReducerFromFunction[V](val f: (V, V) => V) extends AnyVal { + def asReducer: Reducer[V] = new Reducer[V] { + override def apply(value1: V, value2: V): V = f(value1, value2) + } + } + + implicit class InitializerFromFunction[VA](val f: () => VA) extends AnyVal { + def asInitializer: Initializer[VA] = new Initializer[VA] { + override def apply(): VA = f() + } + } + + implicit class TransformerSupplierFromFunction[K, V, VO](val f: () => Transformer[K, V, VO]) extends AnyVal { + def asTransformerSupplier: TransformerSupplier[K, V, VO] = new TransformerSupplier[K, V, VO] { + override def get(): Transformer[K, V, VO] = f() + } + } +} diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala index 5d0f05efcae..5168805d3eb 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala @@ -22,7 +22,7 @@ package kstream import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, _} import org.apache.kafka.streams.scala.ImplicitConversions._ -import org.apache.kafka.streams.scala.FunctionConversions._ +import org.apache.kafka.streams.scala.FunctionsCompatConversions._ /** * Wraps the Java class KGroupedStream and delegates method calls to the underlying Java object. diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala index 56f84e3838b..54b1e1f94bb 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala @@ -22,7 +22,7 @@ package kstream import org.apache.kafka.streams.kstream.{KGroupedTable => KGroupedTableJ, _} import org.apache.kafka.streams.scala.ImplicitConversions._ -import org.apache.kafka.streams.scala.FunctionConversions._ +import org.apache.kafka.streams.scala.FunctionsCompatConversions._ /** * Wraps the Java class KGroupedTable and delegates method calls to the underlying Java object. diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index 8e4c9aa7eca..d54ac5a7d1f 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -24,7 +24,7 @@ import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _} import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier, TopicNameExtractor} import org.apache.kafka.streams.scala.ImplicitConversions._ -import org.apache.kafka.streams.scala.FunctionConversions._ +import org.apache.kafka.streams.scala.FunctionsCompatConversions._ import scala.collection.JavaConverters._ diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala index d41496fb21c..881c8e09410 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala @@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.Serde import org.apache.kafka.common.utils.Bytes import org.apache.kafka.streams.kstream.{KTable => KTableJ, _} import org.apache.kafka.streams.scala.ImplicitConversions._ -import org.apache.kafka.streams.scala.FunctionConversions._ +import org.apache.kafka.streams.scala.FunctionsCompatConversions._ import org.apache.kafka.streams.state.KeyValueStore /** diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala index a6027677edb..6571df9bbeb 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala @@ -22,7 +22,7 @@ package kstream import org.apache.kafka.streams.kstream.{SessionWindowedKStream => SessionWindowedKStreamJ, _} import org.apache.kafka.streams.scala.ImplicitConversions._ -import org.apache.kafka.streams.scala.FunctionConversions._ +import org.apache.kafka.streams.scala.FunctionsCompatConversions._ /** * Wraps the Java class SessionWindowedKStream and delegates method calls to the underlying Java object. diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala index c54ba4f4bd8..d84416e3908 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala @@ -22,7 +22,7 @@ package kstream import org.apache.kafka.streams.kstream.{TimeWindowedKStream => TimeWindowedKStreamJ, _} import org.apache.kafka.streams.scala.ImplicitConversions._ -import org.apache.kafka.streams.scala.FunctionConversions._ +import org.apache.kafka.streams.scala.FunctionsCompatConversions._ /** * Wraps the Java class TimeWindowedKStream and delegates method calls to the underlying Java object.