mirror of https://github.com/apache/kafka.git
KAFKA-12796: Removal of deprecated classes under streams-scala (#10710)
Removes previously deprecated methods in older KIPs Reviewers: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
parent
56d9482462
commit
a02b19cb77
|
@ -421,12 +421,6 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
|
||||||
<Bug pattern="NN_NAKED_NOTIFY"/>
|
<Bug pattern="NN_NAKED_NOTIFY"/>
|
||||||
</Match>
|
</Match>
|
||||||
|
|
||||||
<Match>
|
|
||||||
<Package name="org.apache.kafka.streams.scala"/>
|
|
||||||
<Source name="FunctionConversions.scala"/>
|
|
||||||
<Bug pattern="EQ_UNUSUAL"/>
|
|
||||||
</Match>
|
|
||||||
|
|
||||||
<Match>
|
<Match>
|
||||||
<Package name="org.apache.kafka.streams.scala"/>
|
<Package name="org.apache.kafka.streams.scala"/>
|
||||||
<Source name="FunctionsCompatConversions.scala"/>
|
<Source name="FunctionsCompatConversions.scala"/>
|
||||||
|
|
|
@ -1,83 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.kafka.streams.scala
|
|
||||||
|
|
||||||
import org.apache.kafka.streams.KeyValue
|
|
||||||
import org.apache.kafka.streams.kstream._
|
|
||||||
import scala.jdk.CollectionConverters._
|
|
||||||
import java.lang.{Iterable => JIterable}
|
|
||||||
|
|
||||||
@deprecated("This object is for internal use only", since = "2.1.0")
|
|
||||||
object FunctionConversions {
|
|
||||||
|
|
||||||
implicit private[scala] class ForeachActionFromFunction[K, V](val p: (K, V) => Unit) extends AnyVal {
|
|
||||||
def asForeachAction: ForeachAction[K, V] = (key, value) => p(key, value)
|
|
||||||
}
|
|
||||||
|
|
||||||
implicit class PredicateFromFunction[K, V](val p: (K, V) => Boolean) extends AnyVal {
|
|
||||||
def asPredicate: Predicate[K, V] = (key: K, value: V) => p(key, value)
|
|
||||||
}
|
|
||||||
|
|
||||||
implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends AnyVal {
|
|
||||||
def asKeyValueMapper: KeyValueMapper[T, U, VR] = (key: T, value: U) => f(key, value)
|
|
||||||
def asValueJoiner: ValueJoiner[T, U, VR] = (value1: T, value2: U) => f(value1, value2)
|
|
||||||
}
|
|
||||||
|
|
||||||
implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f: (K, V) => (KR, VR)) extends AnyVal {
|
|
||||||
def asKeyValueMapper: KeyValueMapper[K, V, KeyValue[KR, VR]] = (key: K, value: V) => {
|
|
||||||
val (kr, vr) = f(key, value)
|
|
||||||
KeyValue.pair(kr, vr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
implicit class ValueMapperFromFunction[V, VR](val f: V => VR) extends AnyVal {
|
|
||||||
def asValueMapper: ValueMapper[V, VR] = (value: V) => f(value)
|
|
||||||
}
|
|
||||||
|
|
||||||
implicit class FlatValueMapperFromFunction[V, VR](val f: V => Iterable[VR]) extends AnyVal {
|
|
||||||
def asValueMapper: ValueMapper[V, JIterable[VR]] = (value: V) => f(value).asJava
|
|
||||||
}
|
|
||||||
|
|
||||||
implicit class ValueMapperWithKeyFromFunction[K, V, VR](val f: (K, V) => VR) extends AnyVal {
|
|
||||||
def asValueMapperWithKey: ValueMapperWithKey[K, V, VR] = (readOnlyKey: K, value: V) => f(readOnlyKey, value)
|
|
||||||
}
|
|
||||||
|
|
||||||
implicit class FlatValueMapperWithKeyFromFunction[K, V, VR](val f: (K, V) => Iterable[VR]) extends AnyVal {
|
|
||||||
def asValueMapperWithKey: ValueMapperWithKey[K, V, JIterable[VR]] =
|
|
||||||
(readOnlyKey: K, value: V) => f(readOnlyKey, value).asJava
|
|
||||||
}
|
|
||||||
|
|
||||||
implicit class AggregatorFromFunction[K, V, VA](val f: (K, V, VA) => VA) extends AnyVal {
|
|
||||||
def asAggregator: Aggregator[K, V, VA] = (key: K, value: V, aggregate: VA) => f(key, value, aggregate)
|
|
||||||
}
|
|
||||||
|
|
||||||
implicit class MergerFromFunction[K, VR](val f: (K, VR, VR) => VR) extends AnyVal {
|
|
||||||
def asMerger: Merger[K, VR] = (aggKey: K, aggOne: VR, aggTwo: VR) => f(aggKey, aggOne, aggTwo)
|
|
||||||
}
|
|
||||||
|
|
||||||
implicit class ReducerFromFunction[V](val f: (V, V) => V) extends AnyVal {
|
|
||||||
def asReducer: Reducer[V] = (value1: V, value2: V) => f(value1, value2)
|
|
||||||
}
|
|
||||||
|
|
||||||
implicit class InitializerFromFunction[VA](val f: () => VA) extends AnyVal {
|
|
||||||
def asInitializer: Initializer[VA] = () => f()
|
|
||||||
}
|
|
||||||
|
|
||||||
implicit class TransformerSupplierFromFunction[K, V, VO](val f: () => Transformer[K, V, VO]) extends AnyVal {
|
|
||||||
def asTransformerSupplier: TransformerSupplier[K, V, VO] = () => f()
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -304,7 +304,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
|
||||||
new KStream(inner.toStream[KR](mapper.asKeyValueMapper, named))
|
new KStream(inner.toStream[KR](mapper.asKeyValueMapper, named))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Suppress some updates from this changelog stream, determined by the supplied [[Suppressed]] configuration.
|
* Suppress some updates from this changelog stream, determined by the supplied [[org.apache.kafka.streams.kstream.Suppressed]] configuration.
|
||||||
*
|
*
|
||||||
* This controls what updates downstream table and stream operations will receive.
|
* This controls what updates downstream table and stream operations will receive.
|
||||||
*
|
*
|
||||||
|
|
|
@ -1,128 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.kafka.streams.scala.kstream
|
|
||||||
|
|
||||||
import java.time.Duration
|
|
||||||
|
|
||||||
import org.apache.kafka.streams.kstream.{Windowed, Suppressed => SupressedJ}
|
|
||||||
import org.apache.kafka.streams.kstream.Suppressed.{
|
|
||||||
EagerBufferConfig,
|
|
||||||
StrictBufferConfig,
|
|
||||||
BufferConfig => BufferConfigJ
|
|
||||||
}
|
|
||||||
import org.apache.kafka.streams.kstream.internals.suppress.{
|
|
||||||
EagerBufferConfigImpl,
|
|
||||||
FinalResultsSuppressionBuilder,
|
|
||||||
StrictBufferConfigImpl,
|
|
||||||
SuppressedInternal
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Duplicates the static factory methods inside the Java interface [[org.apache.kafka.streams.kstream.Suppressed]].
|
|
||||||
*
|
|
||||||
* This was required for compatibility w/ Scala 2.11 + Java 1.8 because the Scala 2.11 compiler doesn't support the use
|
|
||||||
* of static methods inside Java interfaces. We have since dropped Scala 2.11 support.
|
|
||||||
*/
|
|
||||||
@deprecated(message = "Use org.apache.kafka.streams.kstream.Suppressed", since = "2.5")
|
|
||||||
object Suppressed {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Configure the suppression to emit only the "final results" from the window.
|
|
||||||
*
|
|
||||||
* By default all Streams operators emit results whenever new results are available.
|
|
||||||
* This includes windowed operations.
|
|
||||||
*
|
|
||||||
* This configuration will instead emit just one result per key for each window, guaranteeing
|
|
||||||
* to deliver only the final result. This option is suitable for use cases in which the business logic
|
|
||||||
* requires a hard guarantee that only the final result is propagated. For example, sending alerts.
|
|
||||||
*
|
|
||||||
* To accomplish this, the operator will buffer events from the window until the window close (that is,
|
|
||||||
* until the end-time passes, and additionally until the grace period expires). Since windowed operators
|
|
||||||
* are required to reject late events for a window whose grace period is expired, there is an additional
|
|
||||||
* guarantee that the final results emitted from this suppression will match any queriable state upstream.
|
|
||||||
*
|
|
||||||
* @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
|
|
||||||
* This is required to be a "strict" config, since it would violate the "final results"
|
|
||||||
* property to emit early and then issue an update later.
|
|
||||||
* @tparam K The [[Windowed]] key type for the KTable to apply this suppression to.
|
|
||||||
* @return a "final results" mode suppression configuration
|
|
||||||
* @see [[org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit]]
|
|
||||||
*/
|
|
||||||
def untilWindowCloses[K](bufferConfig: StrictBufferConfig): SupressedJ[Windowed[K]] =
|
|
||||||
new FinalResultsSuppressionBuilder[Windowed[K]](null, bufferConfig)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Configure the suppression to wait `timeToWaitForMoreEvents` amount of time after receiving a record
|
|
||||||
* before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces
|
|
||||||
* the first record in the buffer but does <em>not</em> re-start the timer.
|
|
||||||
*
|
|
||||||
* @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events.
|
|
||||||
* @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
|
|
||||||
* @tparam K The key type for the KTable to apply this suppression to.
|
|
||||||
* @return a suppression configuration
|
|
||||||
* @see [[org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit]]
|
|
||||||
*/
|
|
||||||
def untilTimeLimit[K](timeToWaitForMoreEvents: Duration, bufferConfig: BufferConfigJ[_]): SupressedJ[K] =
|
|
||||||
new SuppressedInternal[K](null, timeToWaitForMoreEvents, bufferConfig, null, false)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Duplicates the static factory methods inside the Java interface
|
|
||||||
* [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig]].
|
|
||||||
*/
|
|
||||||
object BufferConfig {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a size-constrained buffer in terms of the maximum number of keys it will store.
|
|
||||||
*
|
|
||||||
* @param recordLimit maximum number of keys to buffer.
|
|
||||||
* @return size-constrained buffer in terms of the maximum number of keys it will store.
|
|
||||||
* @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords]]
|
|
||||||
*/
|
|
||||||
def maxRecords(recordLimit: Long): EagerBufferConfig =
|
|
||||||
new EagerBufferConfigImpl(recordLimit, Long.MaxValue)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a size-constrained buffer in terms of the maximum number of bytes it will use.
|
|
||||||
*
|
|
||||||
* @param byteLimit maximum number of bytes to buffer.
|
|
||||||
* @return size-constrained buffer in terms of the maximum number of bytes it will use.
|
|
||||||
* @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes]]
|
|
||||||
*/
|
|
||||||
def maxBytes(byteLimit: Long): EagerBufferConfig =
|
|
||||||
new EagerBufferConfigImpl(Long.MaxValue, byteLimit)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a buffer unconstrained by size (either keys or bytes).
|
|
||||||
*
|
|
||||||
* As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
|
|
||||||
*
|
|
||||||
* If there isn't enough heap available to meet the demand, the application will encounter an
|
|
||||||
* [[OutOfMemoryError]] and shut down (not guaranteed to be a graceful exit). Also, note that
|
|
||||||
* JVM processes under extreme memory pressure may exhibit poor GC behavior.
|
|
||||||
*
|
|
||||||
* This is a convenient option if you doubt that your buffer will be that large, but also don't
|
|
||||||
* wish to pick particular constraints, such as in testing.
|
|
||||||
*
|
|
||||||
* This buffer is "strict" in the sense that it will enforce the time bound or crash.
|
|
||||||
* It will never emit early.
|
|
||||||
*
|
|
||||||
* @return a buffer unconstrained by size (either keys or bytes).
|
|
||||||
* @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded]]
|
|
||||||
*/
|
|
||||||
def unbounded(): StrictBufferConfig = new StrictBufferConfigImpl()
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,93 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.kafka.streams.scala.kstream
|
|
||||||
|
|
||||||
import org.apache.kafka.streams.kstream.internals.suppress._
|
|
||||||
import org.apache.kafka.streams.scala.kstream.Suppressed.BufferConfig
|
|
||||||
import org.junit.jupiter.api.Assertions.assertEquals
|
|
||||||
import org.junit.jupiter.api.Test
|
|
||||||
|
|
||||||
import java.time.Duration
|
|
||||||
|
|
||||||
@deprecated(message = "org.apache.kafka.streams.scala.kstream.Suppressed has been deprecated", since = "2.5")
|
|
||||||
class SuppressedTest {
|
|
||||||
|
|
||||||
@Test
|
|
||||||
def testProduceCorrectSuppressionUntilWindowCloses(): Unit = {
|
|
||||||
val bufferConfig = BufferConfig.unbounded()
|
|
||||||
val suppression = Suppressed.untilWindowCloses[String](bufferConfig)
|
|
||||||
assertEquals(suppression, new FinalResultsSuppressionBuilder(null, bufferConfig))
|
|
||||||
assertEquals(suppression.withName("soup"), new FinalResultsSuppressionBuilder("soup", bufferConfig))
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
def testProduceCorrectSuppressionUntilTimeLimit(): Unit = {
|
|
||||||
val bufferConfig = BufferConfig.unbounded()
|
|
||||||
val duration = Duration.ofMillis(1)
|
|
||||||
assertEquals(Suppressed.untilTimeLimit[String](duration, bufferConfig),
|
|
||||||
new SuppressedInternal[String](null, duration, bufferConfig, null, false))
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
def testProduceCorrectBufferConfigWithMaxRecords(): Unit = {
|
|
||||||
assertEquals(BufferConfig.maxRecords(4), new EagerBufferConfigImpl(4, Long.MaxValue))
|
|
||||||
assertEquals(BufferConfig.maxRecords(4).withMaxBytes(5), new EagerBufferConfigImpl(4, 5))
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
def testProduceCorrectBufferConfigWithMaxBytes(): Unit = {
|
|
||||||
assertEquals(BufferConfig.maxBytes(4), new EagerBufferConfigImpl(Long.MaxValue, 4))
|
|
||||||
assertEquals(BufferConfig.maxBytes(4).withMaxRecords(5), new EagerBufferConfigImpl(5, 4))
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
def testProduceCorrectBufferConfigWithUnbounded(): Unit =
|
|
||||||
assertEquals(BufferConfig.unbounded(),
|
|
||||||
new StrictBufferConfigImpl(Long.MaxValue, Long.MaxValue, BufferFullStrategy.SHUT_DOWN))
|
|
||||||
|
|
||||||
@Test
|
|
||||||
def testSupportLongChainsOfFactoryMethods(): Unit = {
|
|
||||||
val bc1 = BufferConfig
|
|
||||||
.unbounded()
|
|
||||||
.emitEarlyWhenFull()
|
|
||||||
.withMaxRecords(3L)
|
|
||||||
.withMaxBytes(4L)
|
|
||||||
.withMaxRecords(5L)
|
|
||||||
.withMaxBytes(6L)
|
|
||||||
assertEquals(new EagerBufferConfigImpl(5L, 6L), bc1)
|
|
||||||
assertEquals(new StrictBufferConfigImpl(5L, 6L, BufferFullStrategy.SHUT_DOWN), bc1.shutDownWhenFull())
|
|
||||||
|
|
||||||
val bc2 = BufferConfig
|
|
||||||
.maxBytes(4)
|
|
||||||
.withMaxRecords(5)
|
|
||||||
.withMaxBytes(6)
|
|
||||||
.withNoBound()
|
|
||||||
.withMaxBytes(7)
|
|
||||||
.withMaxRecords(8)
|
|
||||||
|
|
||||||
assertEquals(new StrictBufferConfigImpl(8L, 7L, BufferFullStrategy.SHUT_DOWN), bc2)
|
|
||||||
assertEquals(BufferConfig.unbounded(), bc2.withNoBound())
|
|
||||||
|
|
||||||
val bc3 = BufferConfig
|
|
||||||
.maxRecords(5L)
|
|
||||||
.withMaxBytes(10L)
|
|
||||||
.emitEarlyWhenFull()
|
|
||||||
.withMaxRecords(11L)
|
|
||||||
|
|
||||||
assertEquals(new EagerBufferConfigImpl(11L, 10L), bc3)
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue