KAFKA-10546: Deprecate old PAPI (#10869)

* Deprecate the old Processor API
* Suppress warnings on all internal usages of the old API
  (which will be migrated in other child tickets of KAFKA-8410)
* Add new KStream#process methods, since KAFKA-10603 has not seen any action.
This commit is contained in:
John Roesler 2021-06-22 09:17:11 -05:00 committed by GitHub
parent b285662bda
commit c3475081c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
126 changed files with 696 additions and 395 deletions

View File

@ -48,11 +48,11 @@ import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;

View File

@ -487,7 +487,8 @@ public class StreamsBuilder {
/**
* Adds a state store to the underlying {@link Topology}.
* <p>
* It is required to connect state stores to {@link org.apache.kafka.streams.processor.Processor Processors}, {@link Transformer Transformers},
* It is required to connect state stores to {@link org.apache.kafka.streams.processor.api.Processor Processors},
* {@link Transformer Transformers},
* or {@link ValueTransformer ValueTransformers} before they can be used.
*
* @param builder the builder used to obtain this state store {@link StateStore} instance
@ -515,7 +516,8 @@ public class StreamsBuilder {
* This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
* <p>
* It is not required to connect a global store to {@link org.apache.kafka.streams.processor.Processor Processors}, {@link Transformer Transformers},
* It is not required to connect a global store to {@link org.apache.kafka.streams.processor.api.Processor Processors},
* {@link Transformer Transformers},
* or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
* <p>
* The supplier should always generate a new instance each time {@link ProcessorSupplier#get()} gets called. Creating

View File

@ -22,11 +22,11 @@ import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.ConnectedStoreProvider;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
@ -655,7 +655,7 @@ public class Topology {
* will be added to the topology and connected to this processor automatically.
*
* @param name the unique name of the processor node
* @param supplier the supplier used to obtain this node's {@link Processor} instance
* @param supplier the supplier used to obtain this node's {@link org.apache.kafka.streams.processor.Processor} instance
* @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
* and process
* @return itself

View File

@ -23,7 +23,7 @@ package org.apache.kafka.streams.kstream;
* This is a stateless record-by-record operation, i.e, {@link #apply(Object, Object)} is invoked individually for each
* record of a stream.
* If stateful processing is required, consider using
* {@link KStream#process(org.apache.kafka.streams.processor.ProcessorSupplier, String...) KStream#process(...)}.
* {@link KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, String...) KStream#process(...)}.
*
* @param <K> key type
* @param <V> value type

View File

@ -25,9 +25,9 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.ConnectedStoreProvider;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
@ -3173,12 +3173,13 @@ public interface KStream<K, V> {
* (cf. {@link #transformValues(ValueTransformerSupplier, String...) transformValues()} )
* <p>
* Note that it is possible to emit multiple records for each input record by using
* {@link ProcessorContext#forward(Object, Object) context#forward()} in
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) context#forward()} in
* {@link Transformer#transform(Object, Object) Transformer#transform()} and
* {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
* Be aware that a mismatch between the types of the emitted records and the type of the stream would only be
* detected at runtime.
* To ensure type-safety at compile-time, {@link ProcessorContext#forward(Object, Object) context#forward()} should
* To ensure type-safety at compile-time,
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) context#forward()} should
* not be used in {@link Transformer#transform(Object, Object) Transformer#transform()} and
* {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
* If in {@link Transformer#transform(Object, Object) Transformer#transform()} multiple records need to be emitted
@ -3300,12 +3301,13 @@ public interface KStream<K, V> {
* (cf. {@link #transformValues(ValueTransformerSupplier, String...) transformValues()} )
* <p>
* Note that it is possible to emit multiple records for each input record by using
* {@link ProcessorContext#forward(Object, Object) context#forward()} in
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) context#forward()} in
* {@link Transformer#transform(Object, Object) Transformer#transform()} and
* {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
* Be aware that a mismatch between the types of the emitted records and the type of the stream would only be
* detected at runtime.
* To ensure type-safety at compile-time, {@link ProcessorContext#forward(Object, Object) context#forward()} should
* To ensure type-safety at compile-time,
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) context#forward()} should
* not be used in {@link Transformer#transform(Object, Object) Transformer#transform()} and
* {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
* If in {@link Transformer#transform(Object, Object) Transformer#transform()} multiple records need to be emitted
@ -3432,12 +3434,14 @@ public interface KStream<K, V> {
* or join) is applied to the result {@code KStream}.
* (cf. {@link #transformValues(ValueTransformerSupplier, String...) transformValues()})
* <p>
* Note that it is possible to emit records by using {@link ProcessorContext#forward(Object, Object)
* Note that it is possible to emit records by using
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
* context#forward()} in {@link Transformer#transform(Object, Object) Transformer#transform()} and
* {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
* Be aware that a mismatch between the types of the emitted records and the type of the stream would only be
* detected at runtime.
* To ensure type-safety at compile-time, {@link ProcessorContext#forward(Object, Object) context#forward()} should
* To ensure type-safety at compile-time,
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) context#forward()} should
* not be used in {@link Transformer#transform(Object, Object) Transformer#transform()} and
* {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
* The supplier should always generate a new instance each time {@link TransformerSupplier#get()} gets called. Creating
@ -3558,12 +3562,14 @@ public interface KStream<K, V> {
* or join) is applied to the result {@code KStream}.
* (cf. {@link #transformValues(ValueTransformerSupplier, String...) transformValues()})
* <p>
* Note that it is possible to emit records by using {@link ProcessorContext#forward(Object, Object)
* Note that it is possible to emit records by using
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
* context#forward()} in {@link Transformer#transform(Object, Object) Transformer#transform()} and
* {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
* Be aware that a mismatch between the types of the emitted records and the type of the stream would only be
* detected at runtime.
* To ensure type-safety at compile-time, {@link ProcessorContext#forward(Object, Object) context#forward()} should
* To ensure type-safety at compile-time,
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) context#forward()} should
* not be used in {@link Transformer#transform(Object, Object) Transformer#transform()} and
* {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}.
* The supplier should always generate a new instance each time {@link TransformerSupplier#get()} gets called. Creating
@ -3649,7 +3655,8 @@ public interface KStream<K, V> {
* a schedule must be registered.
* The {@link ValueTransformer} must return the new value in {@link ValueTransformer#transform(Object) transform()}.
* In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
* pairs can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* pairs can be emitted via
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
* emit a {@link KeyValue} pair.
* <pre>{@code
@ -3757,7 +3764,8 @@ public interface KStream<K, V> {
* a schedule must be registered.
* The {@link ValueTransformer} must return the new value in {@link ValueTransformer#transform(Object) transform()}.
* In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
* pairs can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* pairs can be emitted via
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
* emit a {@link KeyValue} pair.
* <pre>{@code
@ -3870,7 +3878,8 @@ public interface KStream<K, V> {
* {@link ValueTransformerWithKey#transform(Object, Object) transform()}.
* In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
* {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
* can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* can be emitted via
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
* to emit a {@link KeyValue} pair.
* <pre>{@code
@ -3982,7 +3991,8 @@ public interface KStream<K, V> {
* {@link ValueTransformerWithKey#transform(Object, Object) transform()}.
* In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
* {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
* can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* can be emitted via
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
* to emit a {@link KeyValue} pair.
* <pre>{@code
@ -4099,7 +4109,8 @@ public interface KStream<K, V> {
* {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
* In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
* {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
* can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* can be emitted via
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
* emit a {@link KeyValue} pair.
* <pre>{@code
@ -4221,7 +4232,8 @@ public interface KStream<K, V> {
* {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
* In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
* {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
* can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* can be emitted via
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
* emit a {@link KeyValue} pair.
* <pre>{@code
@ -4345,7 +4357,8 @@ public interface KStream<K, V> {
* is an empty {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
* In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
* {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
* can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* can be emitted via
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
* to emit a {@link KeyValue} pair.
* <pre>{@code
@ -4468,7 +4481,8 @@ public interface KStream<K, V> {
* is an empty {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
* In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
* {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
* can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* can be emitted via
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
* to emit a {@link KeyValue} pair.
* <pre>{@code
@ -4524,6 +4538,109 @@ public interface KStream<K, V> {
final Named named,
final String... stateStoreNames);
/**
* Process all records in this stream, one record at a time, by applying a
* {@link org.apache.kafka.streams.processor.Processor} (provided by the given
* {@link org.apache.kafka.streams.processor.ProcessorSupplier}).
* Attaching a state store makes this a stateful record-by-record operation (cf. {@link #foreach(ForeachAction)}).
* If you choose not to attach one, this operation is similar to the stateless {@link #foreach(ForeachAction)}
* but allows access to the {@code ProcessorContext} and record metadata.
* This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
* can be observed and additional periodic actions can be performed.
* Note that this is a terminal operation that returns void.
* <p>
* In order for the processor to use state stores, the stores must be added to the topology and connected to the
* processor using at least one of two strategies (though it's not required to connect global state stores; read-only
* access to global state stores is available by default).
* <p>
* The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
* and specify the store names via {@code stateStoreNames} so they will be connected to the processor.
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
* Serdes.String(),
* Serdes.String());
* // add store
* builder.addStateStore(keyValueStoreBuilder);
*
* KStream outputStream = inputStream.processor(new ProcessorSupplier() {
* public Processor get() {
* return new MyProcessor();
* }
* }, "myProcessorState");
* }</pre>
* The second strategy is for the given {@link org.apache.kafka.streams.processor.ProcessorSupplier}
* to implement {@link ConnectedStoreProvider#stores()},
* which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the processor.
* <pre>{@code
* class MyProcessorSupplier implements ProcessorSupplier {
* // supply processor
* Processor get() {
* return new MyProcessor();
* }
*
* // provide store(s) that will be added and connected to the associated processor
* // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext
* Set<StoreBuilder> stores() {
* StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
* Serdes.String(),
* Serdes.String());
* return Collections.singleton(keyValueStoreBuilder);
* }
* }
*
* ...
*
* KStream outputStream = inputStream.process(new MyProcessorSupplier());
* }</pre>
* <p>
* With either strategy, within the {@link org.apache.kafka.streams.processor.Processor},
* the state is obtained via the {@link org.apache.kafka.streams.processor.ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* <pre>{@code
* class MyProcessor implements Processor {
* private StateStore state;
*
* void init(ProcessorContext context) {
* this.state = context.getStateStore("myProcessorState");
* // punctuate each second, can access this.state
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
* void process(K key, V value) {
* // can access this.state
* }
*
* void close() {
* // can access this.state
* }
* }
* }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}.
*
* @param processorSupplier an instance of {@link org.apache.kafka.streams.processor.ProcessorSupplier}
* that generates a newly constructed {@link org.apache.kafka.streams.processor.Processor}
* The supplier should always generate a new instance. Creating a single
* {@link org.apache.kafka.streams.processor.Processor} object
* and returning the same object reference in
* {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} is a
* violation of the supplier pattern and leads to runtime exceptions.
* @param stateStoreNames the names of the state stores used by the processor; not required if the supplier
* implements {@link ConnectedStoreProvider#stores()}
* @see #foreach(ForeachAction)
* @see #transform(TransformerSupplier, String...)
* @deprecated Since 3.0. Use {@link KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, java.lang.String...)} instead.
*/
@Deprecated
void process(final org.apache.kafka.streams.processor.ProcessorSupplier<? super K, ? super V> processorSupplier,
final String... stateStoreNames);
/**
* Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given
* {@link ProcessorSupplier}).
@ -4615,7 +4732,110 @@ public interface KStream<K, V> {
* @see #foreach(ForeachAction)
* @see #transform(TransformerSupplier, String...)
*/
void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
void process(final ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier,
final String... stateStoreNames);
/**
* Process all records in this stream, one record at a time, by applying a
* {@link org.apache.kafka.streams.processor.Processor} (provided by the given
* {@link org.apache.kafka.streams.processor.ProcessorSupplier}).
* Attaching a state store makes this a stateful record-by-record operation (cf. {@link #foreach(ForeachAction)}).
* If you choose not to attach one, this operation is similar to the stateless {@link #foreach(ForeachAction)}
* but allows access to the {@code ProcessorContext} and record metadata.
* This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
* can be observed and additional periodic actions can be performed.
* Note that this is a terminal operation that returns void.
* <p>
* In order for the processor to use state stores, the stores must be added to the topology and connected to the
* processor using at least one of two strategies (though it's not required to connect global state stores; read-only
* access to global state stores is available by default).
* <p>
* The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
* and specify the store names via {@code stateStoreNames} so they will be connected to the processor.
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
* Serdes.String(),
* Serdes.String());
* // add store
* builder.addStateStore(keyValueStoreBuilder);
*
* KStream outputStream = inputStream.processor(new ProcessorSupplier() {
* public Processor get() {
* return new MyProcessor();
* }
* }, "myProcessorState");
* }</pre>
* The second strategy is for the given {@link org.apache.kafka.streams.processor.ProcessorSupplier}
* to implement {@link ConnectedStoreProvider#stores()},
* which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the processor.
* <pre>{@code
* class MyProcessorSupplier implements ProcessorSupplier {
* // supply processor
* Processor get() {
* return new MyProcessor();
* }
*
* // provide store(s) that will be added and connected to the associated processor
* // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext
* Set<StoreBuilder> stores() {
* StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
* Serdes.String(),
* Serdes.String());
* return Collections.singleton(keyValueStoreBuilder);
* }
* }
*
* ...
*
* KStream outputStream = inputStream.process(new MyProcessorSupplier());
* }</pre>
* <p>
* With either strategy, within the {@link org.apache.kafka.streams.processor.Processor},
* the state is obtained via the {@link org.apache.kafka.streams.processor.ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* <pre>{@code
* class MyProcessor implements Processor {
* private StateStore state;
*
* void init(ProcessorContext context) {
* this.state = context.getStateStore("myProcessorState");
* // punctuate each second, can access this.state
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
* void process(K key, V value) {
* // can access this.state
* }
*
* void close() {
* // can access this.state
* }
* }
* }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}.
*
* @param processorSupplier an instance of {@link org.apache.kafka.streams.processor.ProcessorSupplier}
* that generates a newly constructed {@link org.apache.kafka.streams.processor.Processor}
* The supplier should always generate a new instance. Creating a single
* {@link org.apache.kafka.streams.processor.Processor} object
* and returning the same object reference in
* {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} is a
* violation of the supplier pattern and leads to runtime exceptions.
* @param named a {@link Named} config used to name the processor in the topology
* @param stateStoreNames the names of the state store used by the processor
* @see #foreach(ForeachAction)
* @see #transform(TransformerSupplier, String...)
* @deprecated Since 3.0. Use {@link KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, org.apache.kafka.streams.kstream.Named, java.lang.String...)} instead.
*/
@Deprecated
void process(final org.apache.kafka.streams.processor.ProcessorSupplier<? super K, ? super V> processorSupplier,
final Named named,
final String... stateStoreNames);
/**
@ -4709,7 +4929,7 @@ public interface KStream<K, V> {
* @see #foreach(ForeachAction)
* @see #transform(TransformerSupplier, String...)
*/
void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
void process(final ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier,
final Named named,
final String... stateStoreNames);
}

View File

@ -38,7 +38,6 @@ import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
class CogroupedStreamAggregateBuilder<K, VOut> {
@ -254,10 +253,11 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
builder);
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private StatefulProcessorNode<K, ?> getStatefulProcessorNode(final String processorName,
final boolean stateCreated,
final StoreBuilder<?> storeBuilder,
final ProcessorSupplier<K, ?> kStreamAggregate) {
final org.apache.kafka.streams.processor.ProcessorSupplier<K, ?> kStreamAggregate) {
final StatefulProcessorNode<K, ?> statefulProcessorNode;
if (!stateCreated) {
statefulProcessorNode =

View File

@ -29,7 +29,6 @@ import org.apache.kafka.streams.kstream.internals.graph.GroupedTableOperationRep
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Collections;
@ -68,7 +67,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
this.userProvidedRepartitionTopicName = groupedInternal.name();
}
private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private <T> KTable<K, T> doAggregate(final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> aggregateSupplier,
final NamedInternal named,
final String functionName,
final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
@ -145,7 +145,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
if (materializedInternal.valueSerde() == null) {
materializedInternal.withValueSerde(valueSerde);
}
final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(
materializedInternal.storeName(),
adder,
subtractor);
@ -176,7 +177,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
materializedInternal.withValueSerde(Serdes.Long());
}
final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(
materializedInternal.storeName(),
countInitializer,
countAdder,
@ -221,7 +223,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
if (materializedInternal.keySerde() == null) {
materializedInternal.withKeySerde(keySerde);
}
final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(
materializedInternal.storeName(),
initializer,
adder,

View File

@ -16,9 +16,8 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.ProcessorSupplier;
public interface KStreamAggProcessorSupplier<K, RK, V, T> extends ProcessorSupplier<K, V> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public interface KStreamAggProcessorSupplier<K, RK, V, T> extends org.apache.kafka.streams.processor.ProcessorSupplier<K, V> {
KTableValueGetterSupplier<RK, T> view();

View File

@ -19,9 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@ -31,6 +28,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, K, V, T> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamAggregate.class);
private final String storeName;
@ -48,7 +46,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
}
@Override
public Processor<K, V> get() {
public org.apache.kafka.streams.processor.Processor<K, V> get() {
return new KStreamAggregateProcessor();
}
@ -58,13 +56,13 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
}
private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
private class KStreamAggregateProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
private TimestampedKeyValueStore<K, T> store;
private Sensor droppedRecordsSensor;
private TimestampedTupleForwarder<K, T> tupleForwarder;
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
droppedRecordsSensor = droppedRecordsSensor(
Thread.currentThread().getName(),
@ -130,7 +128,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
private TimestampedKeyValueStore<K, T> store;
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
store = context.getStateStore(storeName);
}

View File

@ -19,15 +19,12 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Set;
public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> {
private final TransformerSupplier<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformerSupplier;
@ -36,7 +33,7 @@ public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements ProcessorSupp
}
@Override
public Processor<KIn, VIn> get() {
public org.apache.kafka.streams.processor.Processor<KIn, VIn> get() {
return new KStreamFlatTransformProcessor<>(transformerSupplier.get());
}
@ -45,7 +42,7 @@ public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements ProcessorSupp
return transformerSupplier.stores();
}
public static class KStreamFlatTransformProcessor<KIn, VIn, KOut, VOut> extends AbstractProcessor<KIn, VIn> {
public static class KStreamFlatTransformProcessor<KIn, VIn, KOut, VOut> extends org.apache.kafka.streams.processor.AbstractProcessor<KIn, VIn> {
private final Transformer<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformer;
@ -54,7 +51,7 @@ public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements ProcessorSupp
}
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
transformer.init(context);
}

View File

@ -18,16 +18,13 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Set;
public class KStreamFlatTransformValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamFlatTransformValues<KIn, VIn, VOut> implements org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> {
private final ValueTransformerWithKeySupplier<KIn, VIn, Iterable<VOut>> valueTransformerSupplier;
@ -36,7 +33,7 @@ public class KStreamFlatTransformValues<KIn, VIn, VOut> implements ProcessorSupp
}
@Override
public Processor<KIn, VIn> get() {
public org.apache.kafka.streams.processor.Processor<KIn, VIn> get() {
return new KStreamFlatTransformValuesProcessor<>(valueTransformerSupplier.get());
}
@ -45,7 +42,7 @@ public class KStreamFlatTransformValues<KIn, VIn, VOut> implements ProcessorSupp
return valueTransformerSupplier.stores();
}
public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> extends AbstractProcessor<KIn, VIn> {
public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> extends org.apache.kafka.streams.processor.AbstractProcessor<KIn, VIn> {
private final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer;
@ -54,7 +51,7 @@ public class KStreamFlatTransformValues<KIn, VIn, VOut> implements ProcessorSupp
}
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
}

View File

@ -18,10 +18,9 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
class KStreamGlobalKTableJoin<K1, K2, R, V1, V2> implements ProcessorSupplier<K1, V1> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KStreamGlobalKTableJoin<K1, K2, R, V1, V2> implements org.apache.kafka.streams.processor.ProcessorSupplier<K1, V1> {
private final KTableValueGetterSupplier<K2, V2> valueGetterSupplier;
private final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends R> joiner;
@ -39,7 +38,7 @@ class KStreamGlobalKTableJoin<K1, K2, R, V1, V2> implements ProcessorSupplier<K1
}
@Override
public Processor<K1, V1> get() {
public org.apache.kafka.streams.processor.Processor<K1, V1> get() {
return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), mapper, joiner, leftJoin);
}
}

View File

@ -58,7 +58,7 @@ import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.UnoptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.UnoptimizableRepartitionNode.UnoptimizableRepartitionNodeBuilder;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.kstream.ForeachProcessor;
@ -1215,7 +1215,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final KTableValueGetterSupplier<KG, VG> valueGetterSupplier =
((GlobalKTableImpl<KG, VG>) globalTable).valueGetterSupplier();
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, LEFTJOIN_NAME);
final ProcessorSupplier<K, V> processorSupplier = new KStreamGlobalKTableJoin<>(
// Old PAPI. Needs to be migrated.
@SuppressWarnings("deprecation")
final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> processorSupplier = new KStreamGlobalKTableJoin<>(
valueGetterSupplier,
joiner,
keySelector,
@ -1251,7 +1253,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final NamedInternal renamed = new NamedInternal(joinedInternal.name());
final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
final ProcessorSupplier<K, V> processorSupplier = new KStreamKTableJoin<>(
// Old PAPI. Needs to be migrated.
@SuppressWarnings("deprecation")
final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> processorSupplier = new KStreamKTableJoin<>(
((KTableImpl<K, ?, VO>) table).valueGetterSupplier(),
joiner,
leftJoin);
@ -1466,13 +1470,42 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
public void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
@Deprecated
public void process(final org.apache.kafka.streams.processor.ProcessorSupplier<? super K, ? super V> processorSupplier,
final String... stateStoreNames) {
process(processorSupplier, Named.as(builder.newProcessorName(PROCESSOR_NAME)), stateStoreNames);
}
@Override
public void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
public void process(final ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier,
final String... stateStoreNames) {
process(processorSupplier, Named.as(builder.newProcessorName(PROCESSOR_NAME)), stateStoreNames);
}
@Override
@Deprecated
public void process(final org.apache.kafka.streams.processor.ProcessorSupplier<? super K, ? super V> processorSupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(processorSupplier, "processorSupplier can't be null");
Objects.requireNonNull(named, "named can't be null");
Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
ApiUtils.checkSupplier(processorSupplier);
for (final String stateStoreName : stateStoreNames) {
Objects.requireNonNull(stateStoreName, "stateStoreNames can't be null");
}
final String name = new NamedInternal(named).name();
final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>(
name,
new ProcessorParameters<>(processorSupplier, name),
stateStoreNames);
builder.addGraphNode(graphNode, processNode);
}
@Override
public void process(final ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(processorSupplier, "processorSupplier can't be null");

View File

@ -16,13 +16,10 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.WindowStore;
class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KStreamJoinWindow<K, V> implements org.apache.kafka.streams.processor.ProcessorSupplier<K, V> {
private final String windowName;
@ -31,17 +28,17 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
}
@Override
public Processor<K, V> get() {
public org.apache.kafka.streams.processor.Processor<K, V> get() {
return new KStreamJoinWindowProcessor();
}
private class KStreamJoinWindowProcessor extends AbstractProcessor<K, V> {
private class KStreamJoinWindowProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
private WindowStore<K, V> window;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
window = (WindowStore<K, V>) context.getStateStore(windowName);

View File

@ -21,10 +21,6 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
@ -40,7 +36,8 @@ import java.util.Optional;
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KStreamKStreamJoin<K, R, V1, V2> implements org.apache.kafka.streams.processor.ProcessorSupplier<K, V1> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class);
private final String otherWindowName;
@ -81,17 +78,17 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
}
@Override
public Processor<K, V1> get() {
public org.apache.kafka.streams.processor.Processor<K, V1> get() {
return new KStreamKStreamJoinProcessor();
}
private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
private class KStreamKStreamJoinProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V1> {
private WindowStore<K, V2> otherWindowStore;
private Sensor droppedRecordsSensor;
private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);

View File

@ -18,10 +18,9 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
class KStreamKTableJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KStreamKTableJoin<K, R, V1, V2> implements org.apache.kafka.streams.processor.ProcessorSupplier<K, V1> {
private final KeyValueMapper<K, V1, K> keyValueMapper = (key, value) -> key;
private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
@ -37,7 +36,7 @@ class KStreamKTableJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
}
@Override
public Processor<K, V1> get() {
public org.apache.kafka.streams.processor.Processor<K, V1> get() {
return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), keyValueMapper, joiner, leftJoin);
}

View File

@ -19,8 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -28,7 +26,8 @@ import org.slf4j.LoggerFactory;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends org.apache.kafka.streams.processor.AbstractProcessor<K1, V1> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamKTableJoin.class);
private final KTableValueGetter<K2, V2> valueGetter;
@ -48,7 +47,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
}
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);

View File

@ -18,9 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@ -30,6 +27,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, V> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamReduce.class);
@ -44,7 +42,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
}
@Override
public Processor<K, V> get() {
public org.apache.kafka.streams.processor.Processor<K, V> get() {
return new KStreamReduceProcessor();
}
@ -54,13 +52,13 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
}
private class KStreamReduceProcessor extends AbstractProcessor<K, V> {
private class KStreamReduceProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
private TimestampedKeyValueStore<K, V> store;
private TimestampedTupleForwarder<K, V> tupleForwarder;
private Sensor droppedRecordsSensor;
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
droppedRecordsSensor = droppedRecordsSensor(
Thread.currentThread().getName(),
@ -126,7 +124,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
private TimestampedKeyValueStore<K, V> store;
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
store = context.getStateStore(storeName);
}

View File

@ -24,9 +24,6 @@ import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
@ -39,6 +36,7 @@ import java.util.List;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamSessionWindowAggregate.class);
@ -63,7 +61,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
}
@Override
public Processor<K, V> get() {
public org.apache.kafka.streams.processor.Processor<K, V> get() {
return new KStreamSessionWindowAggregateProcessor();
}
@ -76,7 +74,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
sendOldValues = true;
}
private class KStreamSessionWindowAggregateProcessor extends AbstractProcessor<K, V> {
private class KStreamSessionWindowAggregateProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
private SessionStore<K, Agg> store;
private SessionTupleForwarder<K, Agg> tupleForwarder;
@ -84,7 +82,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
final String threadId = Thread.currentThread().getName();
@ -193,7 +191,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
private SessionStore<K, Agg> store;
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
store = context.getStateStore(storeName);
}

View File

@ -24,9 +24,6 @@ import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
@ -40,6 +37,7 @@ import java.util.Set;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
private final Logger log = LoggerFactory.getLogger(getClass());
@ -61,7 +59,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
}
@Override
public Processor<K, V> get() {
public org.apache.kafka.streams.processor.Processor<K, V> get() {
return new KStreamSlidingWindowAggregateProcessor();
}
@ -74,7 +72,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
sendOldValues = true;
}
private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> {
private class KStreamSlidingWindowAggregateProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
private TimestampedWindowStore<K, Agg> windowStore;
private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
private Sensor droppedRecordsSensor;
@ -82,7 +80,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
private Boolean reverseIteratorPossible = null;
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
@ -509,7 +507,7 @@ public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProce
private TimestampedWindowStore<K, Agg> windowStore;
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
windowStore = context.getStateStore(storeName);
}

View File

@ -18,16 +18,13 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Set;
public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamTransformValues<K, V, R> implements org.apache.kafka.streams.processor.ProcessorSupplier<K, V> {
private final ValueTransformerWithKeySupplier<K, V, R> valueTransformerSupplier;
@ -36,7 +33,7 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
}
@Override
public Processor<K, V> get() {
public org.apache.kafka.streams.processor.Processor<K, V> get() {
return new KStreamTransformValuesProcessor<>(valueTransformerSupplier.get());
}
@ -45,7 +42,7 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
return valueTransformerSupplier.stores();
}
public static class KStreamTransformValuesProcessor<K, V, R> extends AbstractProcessor<K, V> {
public static class KStreamTransformValuesProcessor<K, V, R> extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
private final ValueTransformerWithKey<K, V, R> valueTransformer;
@ -54,7 +51,7 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
}
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
}

View File

@ -23,9 +23,6 @@ import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.TimestampedWindowStore;
@ -38,6 +35,7 @@ import java.util.Map;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
private final Logger log = LoggerFactory.getLogger(getClass());
@ -59,7 +57,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
}
@Override
public Processor<K, V> get() {
public org.apache.kafka.streams.processor.Processor<K, V> get() {
return new KStreamWindowAggregateProcessor();
}
@ -73,14 +71,14 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
}
private class KStreamWindowAggregateProcessor extends AbstractProcessor<K, V> {
private class KStreamWindowAggregateProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
private TimestampedWindowStore<K, Agg> windowStore;
private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
private Sensor droppedRecordsSensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
@ -184,7 +182,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
private TimestampedWindowStore<K, Agg> windowStore;
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
windowStore = context.getStateStore(storeName);
}

View File

@ -19,14 +19,12 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> {
private final String storeName;
@ -54,17 +52,17 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
}
@Override
public Processor<K, Change<V>> get() {
public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
return new KTableAggregateProcessor();
}
private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> {
private class KTableAggregateProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
private TimestampedKeyValueStore<K, T> store;
private TimestampedTupleForwarder<K, T> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName);
tupleForwarder = new TimestampedTupleForwarder<>(

View File

@ -58,7 +58,6 @@ import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressi
import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
@ -133,12 +132,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
private boolean sendOldValues = false;
@SuppressWarnings("deprecation") // Old PAPI compatibility.
public KTableImpl(final String name,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Set<String> subTopologySourceNodes,
final String queryableStoreName,
final ProcessorSupplier<?, ?> processorSupplier,
final org.apache.kafka.streams.processor.ProcessorSupplier<?, ?> processorSupplier,
final GraphNode graphNode,
final InternalStreamsBuilder builder) {
super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder);
@ -542,7 +542,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final String storeName =
suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME);
final ProcessorSupplier<K, Change<V>> suppressionSupplier = new KTableSuppressProcessorSupplier<>(
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> suppressionSupplier = new KTableSuppressProcessorSupplier<>(
suppressedInternal,
storeName,
this

View File

@ -19,9 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@ -31,6 +28,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
private static final Logger LOG = LoggerFactory.getLogger(KTableKTableInnerJoin.class);
@ -43,7 +41,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public Processor<K, Change<V1>> get() {
public org.apache.kafka.streams.processor.Processor<K, Change<V1>> get() {
return new KTableKTableJoinProcessor(valueGetterSupplier2.get());
}
@ -64,7 +62,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
}
private class KTableKTableJoinProcessor extends AbstractProcessor<K, Change<V1>> {
private class KTableKTableJoinProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V1>> {
private final KTableValueGetter<K, V2> valueGetter;
private Sensor droppedRecordsSensor;
@ -74,7 +72,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
droppedRecordsSensor = droppedRecordsSensor(
Thread.currentThread().getName(),
@ -137,7 +135,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
valueGetter1.init(context);
valueGetter2.init(context);
}

View File

@ -16,9 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@ -26,6 +23,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
private final KTableProcessorSupplier<K, ?, V> parent1;
@ -46,7 +44,7 @@ public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
}
@Override
public Processor<K, Change<V>> get() {
public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
return new KTableKTableJoinMergeProcessor();
}
@ -96,13 +94,13 @@ public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
return new KTableKTableJoinMerger<>(parent1, parent2, queryableName);
}
private class KTableKTableJoinMergeProcessor extends AbstractProcessor<K, Change<V>> {
private class KTableKTableJoinMergeProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
private TimestampedKeyValueStore<K, V> store;
private TimestampedTupleForwarder<K, V> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
if (queryableName != null) {
store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName);

View File

@ -18,9 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@ -31,6 +28,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.d
import static org.apache.kafka.streams.processor.internals.RecordQueue.UNKNOWN;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
private static final Logger LOG = LoggerFactory.getLogger(KTableKTableLeftJoin.class);
@ -41,7 +39,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public Processor<K, Change<V1>> get() {
public org.apache.kafka.streams.processor.Processor<K, Change<V1>> get() {
return new KTableKTableLeftJoinProcessor(valueGetterSupplier2.get());
}
@ -63,7 +61,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
private class KTableKTableLeftJoinProcessor extends AbstractProcessor<K, Change<V1>> {
private class KTableKTableLeftJoinProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V1>> {
private final KTableValueGetter<K, V2> valueGetter;
private Sensor droppedRecordsSensor;
@ -73,7 +71,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
droppedRecordsSensor = droppedRecordsSensor(
Thread.currentThread().getName(),
@ -143,7 +141,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
valueGetter1.init(context);
valueGetter2.init(context);
}

View File

@ -18,9 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@ -31,6 +28,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.d
import static org.apache.kafka.streams.processor.internals.RecordQueue.UNKNOWN;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
private static final Logger LOG = LoggerFactory.getLogger(KTableKTableOuterJoin.class);
@ -41,7 +39,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public Processor<K, Change<V1>> get() {
public org.apache.kafka.streams.processor.Processor<K, Change<V1>> get() {
return new KTableKTableOuterJoinProcessor(valueGetterSupplier2.get());
}
@ -62,7 +60,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
}
private class KTableKTableOuterJoinProcessor extends AbstractProcessor<K, Change<V1>> {
private class KTableKTableOuterJoinProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V1>> {
private final KTableValueGetter<K, V2> valueGetter;
private Sensor droppedRecordsSensor;
@ -72,7 +70,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
droppedRecordsSensor = droppedRecordsSensor(
Thread.currentThread().getName(),
@ -138,7 +136,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
valueGetter1.init(context);
valueGetter2.init(context);
}

View File

@ -18,9 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@ -30,6 +27,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
private static final Logger LOG = LoggerFactory.getLogger(KTableKTableRightJoin.class);
@ -40,7 +38,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public Processor<K, Change<V1>> get() {
public org.apache.kafka.streams.processor.Processor<K, Change<V1>> get() {
return new KTableKTableRightJoinProcessor(valueGetterSupplier2.get());
}
@ -61,7 +59,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
}
private class KTableKTableRightJoinProcessor extends AbstractProcessor<K, Change<V1>> {
private class KTableKTableRightJoinProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V1>> {
private final KTableValueGetter<K, V2> valueGetter;
private Sensor droppedRecordsSensor;
@ -71,7 +69,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
droppedRecordsSensor = droppedRecordsSensor(
Thread.currentThread().getName(),
@ -134,7 +132,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
valueGetter1.init(context);
valueGetter2.init(context);
}

View File

@ -17,15 +17,13 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
private final KTableImpl<K, ?, V> parent;
private final ValueMapperWithKey<? super K, ? super V, ? extends V1> mapper;
@ -41,7 +39,7 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
}
@Override
public Processor<K, Change<V>> get() {
public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
return new KTableMapValuesProcessor();
}
@ -104,12 +102,12 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
}
private class KTableMapValuesProcessor extends AbstractProcessor<K, Change<V>> {
private class KTableMapValuesProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
private TimestampedKeyValueStore<K, V1> store;
private TimestampedTupleForwarder<K, V1> tupleForwarder;
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
if (queryableName != null) {
store = context.getStateStore(queryableName);
@ -154,7 +152,7 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
}
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
parentGetter.init(context);
}

View File

@ -16,14 +16,12 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.util.Collection;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTablePassThrough<K, V> implements KTableProcessorSupplier<K, V, V> {
private final Collection<KStreamAggProcessorSupplier> parents;
private final String storeName;
@ -35,7 +33,7 @@ public class KTablePassThrough<K, V> implements KTableProcessorSupplier<K, V, V>
}
@Override
public Processor<K, Change<V>> get() {
public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
return new KTablePassThroughProcessor();
}
@ -64,7 +62,7 @@ public class KTablePassThrough<K, V> implements KTableProcessorSupplier<K, V, V>
};
}
private class KTablePassThroughProcessor extends AbstractProcessor<K, Change<V>> {
private class KTablePassThroughProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
@Override
public void process(final K key, final Change<V> value) {
context().forward(key, value);
@ -75,7 +73,7 @@ public class KTablePassThrough<K, V> implements KTableProcessorSupplier<K, V, V>
private TimestampedKeyValueStore<K, V> store;
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
store = context.getStateStore(storeName);
}

View File

@ -16,9 +16,8 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.ProcessorSupplier;
public interface KTableProcessorSupplier<K, V, T> extends ProcessorSupplier<K, Change<V>> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public interface KTableProcessorSupplier<K, V, T> extends org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> {
KTableValueGetterSupplier<K, T> view();

View File

@ -18,14 +18,12 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
private final String storeName;
@ -48,18 +46,18 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
}
@Override
public Processor<K, Change<V>> get() {
public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
return new KTableReduceProcessor();
}
private class KTableReduceProcessor extends AbstractProcessor<K, Change<V>> {
private class KTableReduceProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
private TimestampedKeyValueStore<K, V> store;
private TimestampedTupleForwarder<K, V> tupleForwarder;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
tupleForwarder = new TimestampedTupleForwarder<>(

View File

@ -19,9 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@ -31,6 +28,7 @@ import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
* <p>
* Given the input, it can output at most two records (one mapped from old value and one mapped from new value).
*/
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSupplier<K, V, KeyValue<K1, V1>> {
private final KTableImpl<K, ?, V> parent;
@ -42,7 +40,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
}
@Override
public Processor<K, Change<V>> get() {
public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
return new KTableMapProcessor();
}
@ -72,7 +70,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
throw new IllegalStateException("KTableRepartitionMap should always require sending old values.");
}
private class KTableMapProcessor extends AbstractProcessor<K, Change<V>> {
private class KTableMapProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
/**
* @throws StreamsException if key is null
@ -103,14 +101,14 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
private class KTableMapValueGetter implements KTableValueGetter<K, KeyValue<K1, V1>> {
private final KTableValueGetter<K, V> parentGetter;
private ProcessorContext context;
private org.apache.kafka.streams.processor.ProcessorContext context;
KTableMapValueGetter(final KTableValueGetter<K, V> parentGetter) {
this.parentGetter = parentGetter;
}
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
this.context = context;
parentGetter.init(context);
}

View File

@ -17,10 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@ -31,7 +27,8 @@ import java.util.Objects;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableSource<K, V> implements org.apache.kafka.streams.processor.ProcessorSupplier<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class);
private final String storeName;
@ -51,7 +48,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
}
@Override
public Processor<K, V> get() {
public org.apache.kafka.streams.processor.Processor<K, V> get() {
return new KTableSourceProcessor();
}
@ -72,7 +69,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
return queryableName != null;
}
private class KTableSourceProcessor extends AbstractProcessor<K, V> {
private class KTableSourceProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
private TimestampedKeyValueStore<K, V> store;
private TimestampedTupleForwarder<K, V> tupleForwarder;
@ -80,7 +77,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);

View File

@ -19,9 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@ -33,6 +30,7 @@ import java.util.Objects;
import static org.apache.kafka.streams.processor.internals.RecordQueue.UNKNOWN;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
private final KTableImpl<K, ?, V> parent;
private final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends V1> transformerSupplier;
@ -48,7 +46,7 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
}
@Override
public Processor<K, Change<V>> get() {
public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
return new KTableTransformValuesProcessor(transformerSupplier.get());
}
@ -87,7 +85,7 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
return sendOldValues;
}
private class KTableTransformValuesProcessor extends AbstractProcessor<K, Change<V>> {
private class KTableTransformValuesProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
private final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;
private TimestampedKeyValueStore<K, V1> store;
private TimestampedTupleForwarder<K, V1> tupleForwarder;
@ -97,7 +95,7 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
}
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
if (queryableName != null) {
@ -143,7 +141,7 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V
}
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
internalProcessorContext = (InternalProcessorContext) context;
parentGetter.init(context);
valueTransformer.init(new ForwardingDisabledProcessorContext(context));

View File

@ -22,10 +22,6 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
@ -37,7 +33,8 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements ProcessorSupplier<KO, Change<VO>> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements org.apache.kafka.streams.processor.ProcessorSupplier<KO, Change<VO>> {
private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionProcessorSupplier.class);
private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder;
private final CombinedKeySchema<KO, K> keySchema;
@ -51,17 +48,17 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements Proc
}
@Override
public Processor<KO, Change<VO>> get() {
public org.apache.kafka.streams.processor.Processor<KO, Change<VO>> get() {
return new KTableKTableJoinProcessor();
}
private final class KTableKTableJoinProcessor extends AbstractProcessor<KO, Change<VO>> {
private final class KTableKTableJoinProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<KO, Change<VO>> {
private Sensor droppedRecordsSensor;
private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context;
droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(

View File

@ -21,10 +21,6 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.internals.Murmur3;
@ -40,7 +36,8 @@ import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.Subscrip
import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE;
import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;
public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements ProcessorSupplier<K, Change<V>> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> {
private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionSendProcessorSupplier.class);
private final Function<V, KO> foreignKeyExtractor;
@ -65,11 +62,11 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
}
@Override
public Processor<K, Change<V>> get() {
public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
return new UnbindChangeProcessor();
}
private class UnbindChangeProcessor extends AbstractProcessor<K, Change<V>> {
private class UnbindChangeProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
private Sensor droppedRecordsSensor;
private String foreignKeySerdeTopic;
@ -77,7 +74,7 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
foreignKeySerdeTopic = foreignKeySerdeTopicSupplier.get();
valueSerdeTopic = valueSerdeTopicSupplier.get();

View File

@ -21,10 +21,6 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@ -39,8 +35,9 @@ import java.util.Objects;
* @param <KO> Type of foreign key
* @param <VO> Type of foreign value
*/
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
implements ProcessorSupplier<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> {
implements org.apache.kafka.streams.processor.ProcessorSupplier<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> {
private final KTableValueGetterSupplier<KO, VO> foreignValueGetterSupplier;
@ -49,14 +46,14 @@ public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
}
@Override
public Processor<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> get() {
public org.apache.kafka.streams.processor.Processor<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> get() {
return new AbstractProcessor<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>>() {
return new org.apache.kafka.streams.processor.AbstractProcessor<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>>() {
private KTableValueGetter<KO, VO> foreignValues;
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
foreignValues = foreignValueGetterSupplier.get();
foreignValues.init(context);

View File

@ -22,10 +22,6 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.Murmur3;
@ -41,7 +37,8 @@ import java.util.function.Supplier;
* @param <VO> Type of foreign values
* @param <VR> Type of joined result of primary and foreign values
*/
public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements org.apache.kafka.streams.processor.ProcessorSupplier<K, SubscriptionResponseWrapper<VO>> {
private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
private final Serializer<V> constructionTimeValueSerializer;
private final Supplier<String> valueHashSerdePseudoTopicSupplier;
@ -61,8 +58,8 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
}
@Override
public Processor<K, SubscriptionResponseWrapper<VO>> get() {
return new AbstractProcessor<K, SubscriptionResponseWrapper<VO>>() {
public org.apache.kafka.streams.processor.Processor<K, SubscriptionResponseWrapper<VO>> get() {
return new org.apache.kafka.streams.processor.AbstractProcessor<K, SubscriptionResponseWrapper<VO>>() {
private String valueHashSerdePseudoTopic;
private Serializer<V> runtimeValueSerializer = constructionTimeValueSerializer;
@ -70,7 +67,7 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
valueHashSerdePseudoTopic = valueHashSerdePseudoTopicSupplier.get();
valueGetter = valueGetterSupplier.get();

View File

@ -21,10 +21,6 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
@ -34,8 +30,9 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
implements ProcessorSupplier<KO, SubscriptionWrapper<K>> {
implements org.apache.kafka.streams.processor.ProcessorSupplier<KO, SubscriptionWrapper<K>> {
private static final Logger LOG = LoggerFactory.getLogger(SubscriptionStoreReceiveProcessorSupplier.class);
private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder;
@ -50,15 +47,15 @@ public class SubscriptionStoreReceiveProcessorSupplier<K, KO>
}
@Override
public Processor<KO, SubscriptionWrapper<K>> get() {
public org.apache.kafka.streams.processor.Processor<KO, SubscriptionWrapper<K>> get() {
return new AbstractProcessor<KO, SubscriptionWrapper<K>>() {
return new org.apache.kafka.streams.processor.AbstractProcessor<KO, SubscriptionWrapper<K>>() {
private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;
private Sensor droppedRecordsSensor;
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context;

View File

@ -23,7 +23,6 @@ import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamSlidingWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.processor.ProcessorSupplier;
public final class GraphGraceSearchUtil {
private GraphGraceSearchUtil() {}
@ -72,7 +71,8 @@ public final class GraphGraceSearchUtil {
private static Long extractGracePeriod(final GraphNode node) {
if (node instanceof StatefulProcessorNode) {
final ProcessorSupplier processorSupplier = ((StatefulProcessorNode) node).processorParameters().oldProcessorSupplier();
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
final org.apache.kafka.streams.processor.ProcessorSupplier processorSupplier = ((StatefulProcessorNode) node).processorParameters().oldProcessorSupplier();
if (processorSupplier instanceof KStreamWindowAggregate) {
final KStreamWindowAggregate kStreamWindowAggregate = (KStreamWindowAggregate) processorSupplier;
final Windows windows = kStreamWindowAggregate.windows();

View File

@ -34,10 +34,12 @@ public class ProcessorParameters<KIn, VIn, KOut, VOut> {
// During the transition to KIP-478, we capture arguments passed from the old API to simplify
// the performance of casts that we still need to perform. This will eventually be removed.
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> oldProcessorSupplier;
private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier;
private final String processorName;
@SuppressWarnings("deprecation") // Old PAPI compatibility.
public ProcessorParameters(final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> processorSupplier,
final String processorName) {
oldProcessorSupplier = processorSupplier;
@ -56,6 +58,7 @@ public class ProcessorParameters<KIn, VIn, KOut, VOut> {
return processorSupplier;
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> oldProcessorSupplier() {
return oldProcessorSupplier;
}

View File

@ -101,6 +101,7 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
}
// temporary hack until KIP-478 is fully implemented
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> oldProcessorSupplier =
processorParameters().oldProcessorSupplier();
if (oldProcessorSupplier != null && oldProcessorSupplier.stores() != null) {

View File

@ -26,9 +26,6 @@ import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeDefinition;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
@ -38,6 +35,7 @@ import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import static java.util.Objects.requireNonNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSupplier<K, V, V> {
private final SuppressedInternal<K> suppress;
private final String storeName;
@ -54,7 +52,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
}
@Override
public Processor<K, Change<V>> get() {
public org.apache.kafka.streams.processor.Processor<K, Change<V>> get() {
return new KTableSuppressProcessor<>(suppress, storeName);
}
@ -70,7 +68,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
private TimeOrderedKeyValueBuffer<K, V> buffer;
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
parentGetter.init(context);
// the main processor is responsible for the buffer's lifecycle
buffer = requireNonNull(context.getStateStore(storeName));
@ -111,7 +109,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
return parentKTable.enableSendingOldValues(forceMaterialization);
}
private static final class KTableSuppressProcessor<K, V> extends AbstractProcessor<K, Change<V>> {
private static final class KTableSuppressProcessor<K, V> extends org.apache.kafka.streams.processor.AbstractProcessor<K, Change<V>> {
private final long maxRecords;
private final long maxBytes;
private final long suppressDurationMillis;
@ -138,7 +136,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSup
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
internalProcessorContext = (InternalProcessorContext) context;
suppressionEmitSensor = ProcessorNodeMetrics.suppressionEmitSensor(

View File

@ -22,7 +22,10 @@ package org.apache.kafka.streams.processor;
*
* @param <K> the type of keys
* @param <V> the type of values
* @deprecated Since 3.0. Use {@link org.apache.kafka.streams.processor.api.Processor} or
* {@link org.apache.kafka.streams.processor.api.ContextualProcessor} instead.
*/
@Deprecated
public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
protected ProcessorContext context;

View File

@ -91,8 +91,8 @@ import java.util.Set;
* }</pre>
*
* @see Topology#addProcessor(String, org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
* @see KStream#process(ProcessorSupplier, String...)
* @see KStream#process(ProcessorSupplier, Named, String...)
* @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
* @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, Named, String...)
* @see KStream#transform(TransformerSupplier, String...)
* @see KStream#transform(TransformerSupplier, Named, String...)
* @see KStream#transformValues(ValueTransformerSupplier, String...)

View File

@ -23,7 +23,9 @@ import java.time.Duration;
*
* @param <K> the type of keys
* @param <V> the type of values
* @deprecated Since 3.0. Use {@link org.apache.kafka.streams.processor.api.Processor} instead.
*/
@Deprecated
public interface Processor<K, V> {
/**

View File

@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
/**
* Processor context interface.
*/
@SuppressWarnings("deprecation") // Not deprecating the old context, since it is used by Transformers. See KAFKA-10603.
public interface ProcessorContext {
/**

View File

@ -33,7 +33,9 @@ import java.util.function.Supplier;
*
* @param <K> the type of keys
* @param <V> the type of values
* @deprecated Since 3.0. Use {@link org.apache.kafka.streams.processor.api.ProcessorSupplier} instead.
*/
@Deprecated
public interface ProcessorSupplier<K, V> extends ConnectedStoreProvider, Supplier<Processor<K, V>> {
/**

View File

@ -21,7 +21,7 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
/**
* The context associated with the current record being processed by
* an {@link Processor}
* an {@link org.apache.kafka.streams.processor.api.Processor}
*/
public interface RecordContext {
@ -79,8 +79,10 @@ public interface RecordContext {
* <p> If it is triggered while processing a record streamed from the source processor,
* timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
* {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
* Note, that an upstream {@link Processor} might have set a new timestamp by calling
* {@link ProcessorContext#forward(Object, Object, To) forward(..., To.all().withTimestamp(...))}.
* Note, that an upstream {@link org.apache.kafka.streams.processor.api.Processor}
* might have set a new timestamp by calling
* {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(org.apache.kafka.streams.processor.api.Record)
* forward(..., To.all().withTimestamp(...))}.
* In particular, some Kafka Streams DSL operators set result record timestamps explicitly,
* to guarantee deterministic results.
*

View File

@ -217,6 +217,7 @@ public class InternalTopologyBuilder {
this.supplier = supplier;
}
@SuppressWarnings("deprecation") // Old PAPI compatibility.
ProcessorNodeFactory(final String name,
final String[] predecessors,
final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> supplier) {

View File

@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
@SuppressWarnings("deprecation") // Old PAPI compatibility
public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
private final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate;
private InternalProcessorContext context;

View File

@ -62,7 +62,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
}
public ProcessorNode(final String name,
final org.apache.kafka.streams.processor.Processor<KIn, VIn> processor,
@SuppressWarnings("deprecation") final org.apache.kafka.streams.processor.Processor<KIn, VIn> processor,
final Set<String> stateStores) {
this.name = name;

View File

@ -57,7 +57,7 @@ import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDur
* .withCachingDisabled());
* }</pre>
* When using the Processor API, i.e., {@link org.apache.kafka.streams.Topology Topology}, users create
* {@link StoreBuilder}s that can be attached to {@link org.apache.kafka.streams.processor.Processor Processor}s.
* {@link StoreBuilder}s that can be attached to {@link org.apache.kafka.streams.processor.api.Processor Processor}s.
* For example, you can create a {@link org.apache.kafka.streams.kstream.Windowed windowed} RocksDB store with custom
* changelog topic configuration like:
* <pre>{@code

View File

@ -77,6 +77,7 @@ import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class StreamsBuilderTest {
private static final String STREAM_TOPIC = "stream-topic";

View File

@ -66,6 +66,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertNotNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Category({IntegrationTest.class})
public class GlobalKTableIntegrationTest {
private static final int NUM_BROKERS = 1;

View File

@ -30,7 +30,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
@ -108,6 +107,7 @@ public class GlobalThreadShutDownOrderTest {
@Rule
public TestName testName = new TestName();
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Before
public void before() throws Exception {
builder = new StreamsBuilder();
@ -196,7 +196,8 @@ public class GlobalThreadShutDownOrderTest {
}
private class GlobalStoreProcessor extends AbstractProcessor<String, Long> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private class GlobalStoreProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<String, Long> {
private KeyValueStore<String, Long> store;
private final String storeName;

View File

@ -40,7 +40,6 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils.TrackingS
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
@ -292,6 +291,7 @@ public class RestoreIntegrationTest {
assertTrue(startupLatch.await(30, TimeUnit.SECONDS));
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void shouldProcessDataFromStoresWithLoggingDisabled() throws InterruptedException {
@ -430,7 +430,8 @@ public class RestoreIntegrationTest {
}
}
public static class KeyValueStoreProcessor implements Processor<Integer, Integer> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public static class KeyValueStoreProcessor implements org.apache.kafka.streams.processor.Processor<Integer, Integer> {
private final String topic;
private final CountDownLatch processorLatch;

View File

@ -27,7 +27,6 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@ -60,6 +59,7 @@ import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Category({IntegrationTest.class})
public class StoreUpgradeIntegrationTest {
private static final String STORE_NAME = "store";
@ -953,7 +953,8 @@ public class StoreUpgradeIntegrationTest {
"Could not get expected result in time.");
}
private static class KeyValueProcessor implements Processor<Integer, Integer> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private static class KeyValueProcessor implements org.apache.kafka.streams.processor.Processor<Integer, Integer> {
private KeyValueStore<Integer, Long> store;
@SuppressWarnings("unchecked")
@ -980,7 +981,8 @@ public class StoreUpgradeIntegrationTest {
public void close() {}
}
private static class TimestampedKeyValueProcessor implements Processor<Integer, Integer> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private static class TimestampedKeyValueProcessor implements org.apache.kafka.streams.processor.Processor<Integer, Integer> {
private ProcessorContext context;
private TimestampedKeyValueStore<Integer, Long> store;
@ -1013,7 +1015,8 @@ public class StoreUpgradeIntegrationTest {
public void close() {}
}
private static class WindowedProcessor implements Processor<Integer, Integer> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private static class WindowedProcessor implements org.apache.kafka.streams.processor.Processor<Integer, Integer> {
private WindowStore<Integer, Long> store;
@SuppressWarnings("unchecked")
@ -1040,7 +1043,8 @@ public class StoreUpgradeIntegrationTest {
public void close() {}
}
private static class TimestampedWindowedProcessor implements Processor<Integer, Integer> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private static class TimestampedWindowedProcessor implements org.apache.kafka.streams.processor.Processor<Integer, Integer> {
private ProcessorContext context;
private TimestampedWindowStore<Integer, Long> store;

View File

@ -31,7 +31,6 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.IntegrationTest;
@ -226,7 +225,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
timestamp);
}
private static class ShutdownProcessor extends AbstractProcessor<String, String> {
private static class ShutdownProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<String, String> {
final List<String> valueList;
ShutdownProcessor(final List<String> valueList) {

View File

@ -27,7 +27,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
@ -82,6 +81,7 @@ public class TaskMetadataIntegrationTest {
private AtomicBoolean process;
private AtomicBoolean commit;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Before
public void setup() {
final String testId = safeUniqueTestName(getClass(), testName);
@ -180,7 +180,8 @@ public class TaskMetadataIntegrationTest {
timestamp);
}
private class PauseProcessor extends AbstractProcessor<String, String> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private class PauseProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<String, String> {
@Override
public void process(final String key, final String value) {
while (!process.get()) {

View File

@ -23,7 +23,6 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.junit.Test;
import java.time.Duration;
@ -463,6 +462,7 @@ public class RepartitionTopicNamingTest {
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private Topology buildTopology(final String optimizationConfig) {
final Initializer<Integer> initializer = () -> 0;
final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + v.length();
@ -505,7 +505,8 @@ public class RepartitionTopicNamingTest {
}
private static class SimpleProcessor extends AbstractProcessor<String, String> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private static class SimpleProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<String, String> {
final List<String> valueList;

View File

@ -30,9 +30,6 @@ import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Test;
@ -44,6 +41,7 @@ import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class AbstractStreamTest {
@Test
@ -71,6 +69,7 @@ public class AbstractStreamTest {
verify(valueTransformerWithKeySupplier);
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testShouldBeExtensible() {
final StreamsBuilder builder = new StreamsBuilder();
@ -108,7 +107,7 @@ public class AbstractStreamTest {
}
}
private static class ExtendedKStreamDummy<K, V> implements ProcessorSupplier<K, V> {
private static class ExtendedKStreamDummy<K, V> implements org.apache.kafka.streams.processor.ProcessorSupplier<K, V> {
private final Random rand;
@ -117,11 +116,11 @@ public class AbstractStreamTest {
}
@Override
public Processor<K, V> get() {
public org.apache.kafka.streams.processor.Processor<K, V> get() {
return new ExtendedKStreamDummyProcessor();
}
private class ExtendedKStreamDummyProcessor extends AbstractProcessor<K, V> {
private class ExtendedKStreamDummyProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
@Override
public void process(final K key, final V value) {
// flip a coin and filter

View File

@ -56,6 +56,7 @@ public class GlobalKTableJoinsTest {
keyValueMapper = (key, value) -> value;
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void shouldLeftJoinWithStream() {
final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();
@ -71,6 +72,7 @@ public class GlobalKTableJoinsTest {
verifyJoin(expected, supplier);
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void shouldInnerJoinWithStream() {
final MockProcessorSupplier<String, String> supplier = new MockProcessorSupplier<>();

View File

@ -62,6 +62,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KGroupedStreamImplTest {
private static final String TOPIC = "topic";

View File

@ -126,6 +126,7 @@ public class KGroupedTableImplTest {
Materialized.as(INVALID_STORE_NAME)));
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private MockProcessorSupplier<String, Integer> getReducedResults(final KTable<String, Integer> inputKTable) {
final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
inputKTable

View File

@ -40,7 +40,7 @@ public class KStreamBranchTest {
private final String topicName = "topic";
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
@SuppressWarnings({"unchecked", "deprecation"})
@SuppressWarnings({"unchecked", "deprecation"}) // Old PAPI. Needs to be migrated.
@Test
public void testKStreamBranch() {
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -40,6 +40,7 @@ public class KStreamFilterTest {
private final Predicate<Integer, String> isMultipleOfThree = (key, value) -> (key % 3) == 0;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testFilter() {
final StreamsBuilder builder = new StreamsBuilder();
@ -61,6 +62,7 @@ public class KStreamFilterTest {
assertEquals(2, supplier.theCapturedProcessor().processed().size());
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testFilterNot() {
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -41,6 +41,7 @@ import static org.junit.Assert.assertEquals;
public class KStreamFlatMapTest {
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testFlatMap() {
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -41,6 +41,7 @@ public class KStreamFlatMapValuesTest {
private final String topicName = "topic";
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testFlatMapValues() {
final StreamsBuilder builder = new StreamsBuilder();
@ -77,6 +78,7 @@ public class KStreamFlatMapValuesTest {
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testFlatMapValuesWithKeys() {
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -20,7 +20,6 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.internals.KStreamFlatTransform.KStreamFlatTransformProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
@ -32,6 +31,7 @@ import java.util.Collections;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamFlatTransformTest extends EasyMockSupport {
private Number inputKey;
@ -129,7 +129,7 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
EasyMock.expect(transformerSupplier.get()).andReturn(transformer);
replayAll();
final Processor<Number, Number> processor = processorSupplier.get();
final org.apache.kafka.streams.processor.Processor<Number, Number> processor = processorSupplier.get();
verifyAll();
assertTrue(processor instanceof KStreamFlatTransformProcessor);

View File

@ -24,7 +24,6 @@ import java.util.Collections;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.KStreamFlatTransformValues.KStreamFlatTransformValuesProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.easymock.EasyMock;
@ -32,6 +31,7 @@ import org.easymock.EasyMockSupport;
import org.junit.Before;
import org.junit.Test;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamFlatTransformValuesTest extends EasyMockSupport {
private Integer inputKey;
@ -127,7 +127,7 @@ public class KStreamFlatTransformValuesTest extends EasyMockSupport {
EasyMock.expect(valueTransformerSupplier.get()).andReturn(valueTransformer);
replayAll();
final Processor<Integer, Integer> processor = processorSupplier.get();
final org.apache.kafka.streams.processor.Processor<Integer, Integer> processor = processorSupplier.get();
verifyAll();
assertTrue(processor instanceof KStreamFlatTransformValuesProcessor);

View File

@ -55,6 +55,7 @@ public class KStreamGlobalKTableJoinTest {
private MockProcessor<Integer, String> processor;
private StreamsBuilder builder;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Before
public void setUp() {

View File

@ -55,6 +55,7 @@ public class KStreamGlobalKTableLeftJoinTest {
private TopologyTestDriver driver;
private StreamsBuilder builder;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Before
public void setUp() {

View File

@ -56,6 +56,7 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.KeyValueStore;
@ -99,6 +100,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamImplTest {
private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
@ -2381,7 +2383,7 @@ public class KStreamImplTest {
public void shouldNotAllowNullProcessSupplierOnProcess() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.process(null));
() -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null"));
}
@ -2389,7 +2391,8 @@ public class KStreamImplTest {
public void shouldNotAllowNullProcessSupplierOnProcessWithStores() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.process(null, "storeName"));
() -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null,
"storeName"));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null"));
}
@ -2397,7 +2400,8 @@ public class KStreamImplTest {
public void shouldNotAllowNullProcessSupplierOnProcessWithNamed() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.process(null, Named.as("processor")));
() -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null,
Named.as("processor")));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null"));
}
@ -2405,7 +2409,8 @@ public class KStreamImplTest {
public void shouldNotAllowNullProcessSupplierOnProcessWithNamedAndStores() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.process(null, Named.as("processor"), "stateStore"));
() -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null,
Named.as("processor"), "stateStore"));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null"));
}

View File

@ -63,6 +63,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamKStreamJoinTest {
private final String topic1 = "topic1";
private final String topic2 = "topic2";

View File

@ -53,6 +53,7 @@ import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamKStreamLeftJoinTest {
private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];

View File

@ -52,6 +52,7 @@ import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamKStreamOuterJoinTest {
private final String topic1 = "topic1";
private final String topic2 = "topic2";

View File

@ -66,6 +66,7 @@ public class KStreamKTableJoinTest {
private StreamsBuilder builder;
private final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Before
public void setUp() {
builder = new StreamsBuilder();

View File

@ -59,6 +59,7 @@ public class KStreamKTableLeftJoinTest {
private MockProcessor<Integer, String> processor;
private StreamsBuilder builder;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Before
public void setUp() {
builder = new StreamsBuilder();

View File

@ -39,6 +39,7 @@ import static org.junit.Assert.assertEquals;
public class KStreamMapTest {
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testMap() {
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -39,6 +39,7 @@ public class KStreamMapValuesTest {
private final MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testFlatMapValues() {
final StreamsBuilder builder = new StreamsBuilder();
@ -63,6 +64,7 @@ public class KStreamMapValuesTest {
assertArrayEquals(expected, supplier.theCapturedProcessor().processed().toArray());
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testMapValuesWithKeys() {
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -41,6 +41,7 @@ public class KStreamSelectKeyTest {
private final String topicName = "topic_key_select";
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testSelectKey() {
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -33,7 +33,6 @@ import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@ -68,6 +67,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamSessionWindowAggregateProcessorTest {
private static final long GAP_MS = 5 * 60 * 1000L;
@ -87,7 +87,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
sessionMerger);
private final List<KeyValueTimestamp<Windowed<String>, Change<Long>>> results = new ArrayList<>();
private final Processor<String, String> processor = sessionAggregator.get();
private final org.apache.kafka.streams.processor.Processor<String, String> processor = sessionAggregator.get();
private SessionStore<String, Long> sessionStore;
private InternalMockProcessorContext context;
private final Metrics metrics = new Metrics();
@ -404,7 +404,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
@Test
public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace() {
setup(false);
final Processor<String, String> processor = new KStreamSessionWindowAggregate<>(
final org.apache.kafka.streams.processor.Processor<String, String> processor = new KStreamSessionWindowAggregate<>(
SessionWindows.with(ofMillis(10L)).grace(ofMillis(0L)),
STORE_NAME,
initializer,
@ -469,7 +469,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
@Test
public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace() {
setup(false);
final Processor<String, String> processor = new KStreamSessionWindowAggregate<>(
final org.apache.kafka.streams.processor.Processor<String, String> processor = new KStreamSessionWindowAggregate<>(
SessionWindows.with(ofMillis(10L)).grace(ofMillis(1L)),
STORE_NAME,
initializer,

View File

@ -81,6 +81,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@RunWith(Parameterized.class)
public class KStreamSlidingWindowAggregateTest {

View File

@ -44,6 +44,7 @@ public class KStreamTransformTest {
private static final String TOPIC_NAME = "topic";
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testTransform() {
final StreamsBuilder builder = new StreamsBuilder();
@ -106,6 +107,7 @@ public class KStreamTransformTest {
}
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testTransformWithNewDriverAndPunctuator() {
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -27,7 +27,6 @@ import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.TestInputTopic;
@ -47,6 +46,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
@RunWith(EasyMockRunner.class)
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamTransformValuesTest {
private final String topicName = "topic";
private final MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
@ -54,6 +54,7 @@ public class KStreamTransformValuesTest {
@Mock(MockType.NICE)
private ProcessorContext context;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testTransform() {
final StreamsBuilder builder = new StreamsBuilder();
@ -96,6 +97,7 @@ public class KStreamTransformValuesTest {
assertArrayEquals(expected, supplier.theCapturedProcessor().processed().toArray());
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testTransformWithKey() {
final StreamsBuilder builder = new StreamsBuilder();
@ -143,7 +145,7 @@ public class KStreamTransformValuesTest {
public void shouldInitializeTransformerWithForwardDisabledProcessorContext() {
final NoOpValueTransformerWithKeySupplier<String, String> transformer = new NoOpValueTransformerWithKeySupplier<>();
final KStreamTransformValues<String, String, String> transformValues = new KStreamTransformValues<>(transformer);
final Processor<String, String> processor = transformValues.get();
final org.apache.kafka.streams.processor.Processor<String, String> processor = transformValues.get();
processor.init(context);

View File

@ -66,6 +66,7 @@ public class KStreamWindowAggregateTest {
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private final String threadId = Thread.currentThread().getName();
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testAggBasic() {
final StreamsBuilder builder = new StreamsBuilder();
@ -143,6 +144,7 @@ public class KStreamWindowAggregateTest {
);
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testJoin() {
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -49,6 +49,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.junit.Assert.assertEquals;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableAggregateTest {
private final Serde<String> stringSerde = Serdes.String();
private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);

View File

@ -66,6 +66,7 @@ public class KTableFilterTest {
private final Predicate<String, Integer> predicate = (key, value) -> (value % 2) == 0;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private void doTestKTable(final StreamsBuilder builder,
final KTable<String, Integer> table2,
final KTable<String, Integer> table3,

View File

@ -83,6 +83,7 @@ public class KTableImplTest {
table = new StreamsBuilder().table("test");
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testKTable() {
final StreamsBuilder builder = new StreamsBuilder();
@ -151,6 +152,7 @@ public class KTableImplTest {
processors.get(3).processed());
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testMaterializedKTable() {
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -29,7 +29,6 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.test.TestRecord;
@ -54,6 +53,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableKTableInnerJoinTest {
private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
@ -251,7 +251,7 @@ public class KTableKTableInnerJoinTest {
final StreamsBuilder builder = new StreamsBuilder();
@SuppressWarnings("unchecked")
final Processor<String, Change<String>> join = new KTableKTableInnerJoin<>(
final org.apache.kafka.streams.processor.Processor<String, Change<String>> join = new KTableKTableInnerJoin<>(
(KTableImpl<String, String, String>) builder.table("left", Consumed.with(Serdes.String(), Serdes.String())),
(KTableImpl<String, String, String>) builder.table("right", Consumed.with(Serdes.String(), Serdes.String())),
null

View File

@ -33,7 +33,6 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.TestRecord;
@ -62,6 +61,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableKTableLeftJoinTest {
private final String topic1 = "topic1";
private final String topic2 = "topic2";
@ -517,7 +517,7 @@ public class KTableKTableLeftJoinTest {
final StreamsBuilder builder = new StreamsBuilder();
@SuppressWarnings("unchecked")
final Processor<String, Change<String>> join = new KTableKTableLeftJoin<>(
final org.apache.kafka.streams.processor.Processor<String, Change<String>> join = new KTableKTableLeftJoin<>(
(KTableImpl<String, String, String>) builder.table("left", Consumed.with(Serdes.String(), Serdes.String())),
(KTableImpl<String, String, String>) builder.table("right", Consumed.with(Serdes.String(), Serdes.String())),
null

View File

@ -28,7 +28,6 @@ import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockApiProcessor;
@ -52,6 +51,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableKTableOuterJoinTest {
private final String topic1 = "topic1";
private final String topic2 = "topic2";
@ -408,7 +408,7 @@ public class KTableKTableOuterJoinTest {
final StreamsBuilder builder = new StreamsBuilder();
@SuppressWarnings("unchecked")
final Processor<String, Change<String>> join = new KTableKTableOuterJoin<>(
final org.apache.kafka.streams.processor.Processor<String, Change<String>> join = new KTableKTableOuterJoin<>(
(KTableImpl<String, String, String>) builder.table("left", Consumed.with(Serdes.String(), Serdes.String())),
(KTableImpl<String, String, String>) builder.table("right", Consumed.with(Serdes.String(), Serdes.String())),
null

View File

@ -22,7 +22,6 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
@ -32,6 +31,7 @@ import java.util.Properties;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableKTableRightJoinTest {
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
@ -41,7 +41,7 @@ public class KTableKTableRightJoinTest {
final StreamsBuilder builder = new StreamsBuilder();
@SuppressWarnings("unchecked")
final Processor<String, Change<String>> join = new KTableKTableRightJoin<>(
final org.apache.kafka.streams.processor.Processor<String, Change<String>> join = new KTableKTableRightJoin<>(
(KTableImpl<String, String, String>) builder.table("left", Consumed.with(Serdes.String(), Serdes.String())),
(KTableImpl<String, String, String>) builder.table("right", Consumed.with(Serdes.String(), Serdes.String())),
null

View File

@ -39,6 +39,7 @@ import static org.junit.Assert.assertEquals;
public class KTableMapKeysTest {
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testMapKeysConvertingToStream() {
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -71,6 +71,7 @@ public class KTableMapValuesTest {
}
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testKTable() {
final StreamsBuilder builder = new StreamsBuilder();
@ -85,6 +86,7 @@ public class KTableMapValuesTest {
doTestKTable(builder, topic1, supplier);
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testQueryableKTable() {
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@ -32,13 +31,14 @@ import static java.util.Collections.singleton;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KTableReduceTest {
@Test
public void shouldAddAndSubtract() {
final InternalMockProcessorContext<String, Change<Set<String>>> context = new InternalMockProcessorContext<>();
final Processor<String, Change<Set<String>>> reduceProcessor =
final org.apache.kafka.streams.processor.Processor<String, Change<Set<String>>> reduceProcessor =
new KTableReduce<String, Set<String>>(
"myStore",
this::unionNotNullArgs,

View File

@ -59,6 +59,7 @@ public class KTableSourceTest {
private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testKTable() {
final StreamsBuilder builder = new StreamsBuilder();

Some files were not shown because too many files have changed in this diff Show More