From a02b19cb77084a573a25be1b75fc195b9a9c1f9b Mon Sep 17 00:00:00 2001 From: Josep Prat Date: Thu, 27 May 2021 11:30:15 +0200 Subject: [PATCH] KAFKA-12796: Removal of deprecated classes under streams-scala (#10710) Removes previously deprecated methods in older KIPs Reviewers: Bruno Cadonna --- gradle/spotbugs-exclude.xml | 6 - .../streams/scala/FunctionConversions.scala | 83 ------------ .../kafka/streams/scala/kstream/KTable.scala | 2 +- .../streams/scala/kstream/Suppressed.scala | 128 ------------------ .../scala/kstream/SuppressedTest.scala | 93 ------------- 5 files changed, 1 insertion(+), 311 deletions(-) delete mode 100644 streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala delete mode 100644 streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala delete mode 100644 streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index ab60dfdc7d0..a4e894cccc2 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -421,12 +421,6 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - - - - - - diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala deleted file mode 100644 index 892419a6cd8..00000000000 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.scala - -import org.apache.kafka.streams.KeyValue -import org.apache.kafka.streams.kstream._ -import scala.jdk.CollectionConverters._ -import java.lang.{Iterable => JIterable} - -@deprecated("This object is for internal use only", since = "2.1.0") -object FunctionConversions { - - implicit private[scala] class ForeachActionFromFunction[K, V](val p: (K, V) => Unit) extends AnyVal { - def asForeachAction: ForeachAction[K, V] = (key, value) => p(key, value) - } - - implicit class PredicateFromFunction[K, V](val p: (K, V) => Boolean) extends AnyVal { - def asPredicate: Predicate[K, V] = (key: K, value: V) => p(key, value) - } - - implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends AnyVal { - def asKeyValueMapper: KeyValueMapper[T, U, VR] = (key: T, value: U) => f(key, value) - def asValueJoiner: ValueJoiner[T, U, VR] = (value1: T, value2: U) => f(value1, value2) - } - - implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f: (K, V) => (KR, VR)) extends AnyVal { - def asKeyValueMapper: KeyValueMapper[K, V, KeyValue[KR, VR]] = (key: K, value: V) => { - val (kr, vr) = f(key, value) - KeyValue.pair(kr, vr) - } - } - - implicit class ValueMapperFromFunction[V, VR](val f: V => VR) extends AnyVal { - def asValueMapper: ValueMapper[V, VR] = (value: V) => f(value) - } - - implicit class FlatValueMapperFromFunction[V, VR](val f: V => Iterable[VR]) extends AnyVal { - def asValueMapper: ValueMapper[V, JIterable[VR]] = (value: V) => f(value).asJava - } - - implicit class ValueMapperWithKeyFromFunction[K, V, VR](val f: (K, V) => VR) extends AnyVal { - def asValueMapperWithKey: ValueMapperWithKey[K, V, VR] = (readOnlyKey: K, value: V) => f(readOnlyKey, value) - } - - implicit class FlatValueMapperWithKeyFromFunction[K, V, VR](val f: (K, V) => Iterable[VR]) extends AnyVal { - def asValueMapperWithKey: ValueMapperWithKey[K, V, JIterable[VR]] = - (readOnlyKey: K, value: V) => f(readOnlyKey, value).asJava - } - - implicit class AggregatorFromFunction[K, V, VA](val f: (K, V, VA) => VA) extends AnyVal { - def asAggregator: Aggregator[K, V, VA] = (key: K, value: V, aggregate: VA) => f(key, value, aggregate) - } - - implicit class MergerFromFunction[K, VR](val f: (K, VR, VR) => VR) extends AnyVal { - def asMerger: Merger[K, VR] = (aggKey: K, aggOne: VR, aggTwo: VR) => f(aggKey, aggOne, aggTwo) - } - - implicit class ReducerFromFunction[V](val f: (V, V) => V) extends AnyVal { - def asReducer: Reducer[V] = (value1: V, value2: V) => f(value1, value2) - } - - implicit class InitializerFromFunction[VA](val f: () => VA) extends AnyVal { - def asInitializer: Initializer[VA] = () => f() - } - - implicit class TransformerSupplierFromFunction[K, V, VO](val f: () => Transformer[K, V, VO]) extends AnyVal { - def asTransformerSupplier: TransformerSupplier[K, V, VO] = () => f() - } -} diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala index ccabeb7ee2b..7aba69d50e7 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala @@ -304,7 +304,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) { new KStream(inner.toStream[KR](mapper.asKeyValueMapper, named)) /** - * Suppress some updates from this changelog stream, determined by the supplied [[Suppressed]] configuration. + * Suppress some updates from this changelog stream, determined by the supplied [[org.apache.kafka.streams.kstream.Suppressed]] configuration. * * This controls what updates downstream table and stream operations will receive. * diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala deleted file mode 100644 index 22b20b98cd1..00000000000 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.scala.kstream - -import java.time.Duration - -import org.apache.kafka.streams.kstream.{Windowed, Suppressed => SupressedJ} -import org.apache.kafka.streams.kstream.Suppressed.{ - EagerBufferConfig, - StrictBufferConfig, - BufferConfig => BufferConfigJ -} -import org.apache.kafka.streams.kstream.internals.suppress.{ - EagerBufferConfigImpl, - FinalResultsSuppressionBuilder, - StrictBufferConfigImpl, - SuppressedInternal -} - -/** - * Duplicates the static factory methods inside the Java interface [[org.apache.kafka.streams.kstream.Suppressed]]. - * - * This was required for compatibility w/ Scala 2.11 + Java 1.8 because the Scala 2.11 compiler doesn't support the use - * of static methods inside Java interfaces. We have since dropped Scala 2.11 support. - */ -@deprecated(message = "Use org.apache.kafka.streams.kstream.Suppressed", since = "2.5") -object Suppressed { - - /** - * Configure the suppression to emit only the "final results" from the window. - * - * By default all Streams operators emit results whenever new results are available. - * This includes windowed operations. - * - * This configuration will instead emit just one result per key for each window, guaranteeing - * to deliver only the final result. This option is suitable for use cases in which the business logic - * requires a hard guarantee that only the final result is propagated. For example, sending alerts. - * - * To accomplish this, the operator will buffer events from the window until the window close (that is, - * until the end-time passes, and additionally until the grace period expires). Since windowed operators - * are required to reject late events for a window whose grace period is expired, there is an additional - * guarantee that the final results emitted from this suppression will match any queriable state upstream. - * - * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results. - * This is required to be a "strict" config, since it would violate the "final results" - * property to emit early and then issue an update later. - * @tparam K The [[Windowed]] key type for the KTable to apply this suppression to. - * @return a "final results" mode suppression configuration - * @see [[org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit]] - */ - def untilWindowCloses[K](bufferConfig: StrictBufferConfig): SupressedJ[Windowed[K]] = - new FinalResultsSuppressionBuilder[Windowed[K]](null, bufferConfig) - - /** - * Configure the suppression to wait `timeToWaitForMoreEvents` amount of time after receiving a record - * before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces - * the first record in the buffer but does not re-start the timer. - * - * @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events. - * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results. - * @tparam K The key type for the KTable to apply this suppression to. - * @return a suppression configuration - * @see [[org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit]] - */ - def untilTimeLimit[K](timeToWaitForMoreEvents: Duration, bufferConfig: BufferConfigJ[_]): SupressedJ[K] = - new SuppressedInternal[K](null, timeToWaitForMoreEvents, bufferConfig, null, false) - - /** - * Duplicates the static factory methods inside the Java interface - * [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig]]. - */ - object BufferConfig { - - /** - * Create a size-constrained buffer in terms of the maximum number of keys it will store. - * - * @param recordLimit maximum number of keys to buffer. - * @return size-constrained buffer in terms of the maximum number of keys it will store. - * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords]] - */ - def maxRecords(recordLimit: Long): EagerBufferConfig = - new EagerBufferConfigImpl(recordLimit, Long.MaxValue) - - /** - * Create a size-constrained buffer in terms of the maximum number of bytes it will use. - * - * @param byteLimit maximum number of bytes to buffer. - * @return size-constrained buffer in terms of the maximum number of bytes it will use. - * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes]] - */ - def maxBytes(byteLimit: Long): EagerBufferConfig = - new EagerBufferConfigImpl(Long.MaxValue, byteLimit) - - /** - * Create a buffer unconstrained by size (either keys or bytes). - * - * As a result, the buffer will consume as much memory as it needs, dictated by the time bound. - * - * If there isn't enough heap available to meet the demand, the application will encounter an - * [[OutOfMemoryError]] and shut down (not guaranteed to be a graceful exit). Also, note that - * JVM processes under extreme memory pressure may exhibit poor GC behavior. - * - * This is a convenient option if you doubt that your buffer will be that large, but also don't - * wish to pick particular constraints, such as in testing. - * - * This buffer is "strict" in the sense that it will enforce the time bound or crash. - * It will never emit early. - * - * @return a buffer unconstrained by size (either keys or bytes). - * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded]] - */ - def unbounded(): StrictBufferConfig = new StrictBufferConfigImpl() - } -} diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala deleted file mode 100644 index 86936a21eca..00000000000 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.scala.kstream - -import org.apache.kafka.streams.kstream.internals.suppress._ -import org.apache.kafka.streams.scala.kstream.Suppressed.BufferConfig -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test - -import java.time.Duration - -@deprecated(message = "org.apache.kafka.streams.scala.kstream.Suppressed has been deprecated", since = "2.5") -class SuppressedTest { - - @Test - def testProduceCorrectSuppressionUntilWindowCloses(): Unit = { - val bufferConfig = BufferConfig.unbounded() - val suppression = Suppressed.untilWindowCloses[String](bufferConfig) - assertEquals(suppression, new FinalResultsSuppressionBuilder(null, bufferConfig)) - assertEquals(suppression.withName("soup"), new FinalResultsSuppressionBuilder("soup", bufferConfig)) - } - - @Test - def testProduceCorrectSuppressionUntilTimeLimit(): Unit = { - val bufferConfig = BufferConfig.unbounded() - val duration = Duration.ofMillis(1) - assertEquals(Suppressed.untilTimeLimit[String](duration, bufferConfig), - new SuppressedInternal[String](null, duration, bufferConfig, null, false)) - } - - @Test - def testProduceCorrectBufferConfigWithMaxRecords(): Unit = { - assertEquals(BufferConfig.maxRecords(4), new EagerBufferConfigImpl(4, Long.MaxValue)) - assertEquals(BufferConfig.maxRecords(4).withMaxBytes(5), new EagerBufferConfigImpl(4, 5)) - } - - @Test - def testProduceCorrectBufferConfigWithMaxBytes(): Unit = { - assertEquals(BufferConfig.maxBytes(4), new EagerBufferConfigImpl(Long.MaxValue, 4)) - assertEquals(BufferConfig.maxBytes(4).withMaxRecords(5), new EagerBufferConfigImpl(5, 4)) - } - - @Test - def testProduceCorrectBufferConfigWithUnbounded(): Unit = - assertEquals(BufferConfig.unbounded(), - new StrictBufferConfigImpl(Long.MaxValue, Long.MaxValue, BufferFullStrategy.SHUT_DOWN)) - - @Test - def testSupportLongChainsOfFactoryMethods(): Unit = { - val bc1 = BufferConfig - .unbounded() - .emitEarlyWhenFull() - .withMaxRecords(3L) - .withMaxBytes(4L) - .withMaxRecords(5L) - .withMaxBytes(6L) - assertEquals(new EagerBufferConfigImpl(5L, 6L), bc1) - assertEquals(new StrictBufferConfigImpl(5L, 6L, BufferFullStrategy.SHUT_DOWN), bc1.shutDownWhenFull()) - - val bc2 = BufferConfig - .maxBytes(4) - .withMaxRecords(5) - .withMaxBytes(6) - .withNoBound() - .withMaxBytes(7) - .withMaxRecords(8) - - assertEquals(new StrictBufferConfigImpl(8L, 7L, BufferFullStrategy.SHUT_DOWN), bc2) - assertEquals(BufferConfig.unbounded(), bc2.withNoBound()) - - val bc3 = BufferConfig - .maxRecords(5L) - .withMaxBytes(10L) - .emitEarlyWhenFull() - .withMaxRecords(11L) - - assertEquals(new EagerBufferConfigImpl(11L, 10L), bc3) - } -}