KAFKA-9738: Deprecate old Processor API (#16742)

Implements KIP-1070, which deprecates:
 - Transformer
 - TransformerSupplier
 - ValueTransformer
 - ValueTransformerSupplier
 - MockProcessorContext

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-08-02 12:55:53 -07:00 committed by GitHub
parent 5afdb17092
commit 3922cadc5d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 217 additions and 204 deletions

View File

@ -133,6 +133,16 @@
More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION</code> can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
</p>
<h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API changes in 4.0.0</a></h3>
<p>
In previous release, a new version of the Processor API was introduced and the old Processor API was
incrementally replaced and deprecated.
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1070%3A+deprecate+MockProcessorContext">KIP-1070</a>
follow this path by deprecating <code>MockProcessorContext</code>, <code>Transformer</code>,
<code>TransformerSupplier</code>, <code>ValueTransformer</code>, and <code>ValueTransformerSupplier</code>.
</p>
<h3><a id="streams_api_changes_390" href="#streams_api_changes_390">Streams API changes in 3.9.0</a></h3>
<p>

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.examples.wordcount;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
@ -37,7 +36,7 @@ import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Demonstrate the use of {@link MockProcessorContext} for testing the {@link Transformer} in the {@link WordCountTransformerDemo}.
* Demonstrate the use of {@link MockProcessorContext} for testing the {@link org.apache.kafka.streams.kstream.Transformer} in the {@link WordCountTransformerDemo}.
*/
public class WordCountTransformerTest {
@Test

View File

@ -25,8 +25,6 @@ import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
@ -512,8 +510,8 @@ public class StreamsBuilder {
* Adds a state store to the underlying {@link Topology}.
* <p>
* It is required to connect state stores to {@link org.apache.kafka.streams.processor.api.Processor Processors},
* {@link Transformer Transformers},
* or {@link ValueTransformer ValueTransformers} before they can be used.
* {@link org.apache.kafka.streams.kstream.Transformer Transformers},
* or {@link org.apache.kafka.streams.kstream.ValueTransformer ValueTransformers} before they can be used.
*
* @param builder the builder used to obtain this state store {@link StateStore} instance
* @return itself
@ -539,8 +537,8 @@ public class StreamsBuilder {
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
* <p>
* It is not required to connect a global store to {@link org.apache.kafka.streams.processor.api.Processor Processors},
* {@link Transformer Transformers},
* or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
* {@link org.apache.kafka.streams.kstream.Transformer Transformers},
* or {@link org.apache.kafka.streams.kstream.ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
* <p>
* The supplier should always generate a new instance each time {@link ProcessorSupplier#get()} gets called. Creating
* a single {@link Processor} object and returning the same object reference in {@link ProcessorSupplier#get()} would be
@ -588,7 +586,9 @@ public class StreamsBuilder {
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
* <p>
* It is not required to connect a global store to the {@link Processor Processors},
* {@link Transformer Transformers}, or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
* {@link org.apache.kafka.streams.kstream.Transformer Transformers},
* or {@link org.apache.kafka.streams.kstream.ValueTransformer ValueTransformer};
* those have read-only access to all global stores by default.
*
* @param storeBuilder user defined {@link StoreBuilder}; can't be {@code null}
* @param topic the topic to source the data from

View File

@ -16,8 +16,6 @@
*/
package org.apache.kafka.streams.internals;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import java.time.Duration;
import java.time.Instant;
import java.util.function.Supplier;
@ -93,7 +91,8 @@ public final class ApiUtils {
/**
* @throws IllegalArgumentException if the same instance is obtained each time
*/
public static <VR, V> void checkSupplier(final ValueTransformerSupplier<V, VR> supplier) {
@SuppressWarnings("deprecation")
public static <VR, V> void checkSupplier(final org.apache.kafka.streams.kstream.ValueTransformerSupplier<V, VR> supplier) {
if (supplier.get() == supplier.get()) {
final String supplierClass = supplier.getClass().getName();
throw new IllegalArgumentException(String.format("%s generates single reference." +

View File

@ -27,14 +27,14 @@ import org.apache.kafka.streams.KeyValue;
* <li>map from an input record to a new key (with arbitrary key type as specified by {@code VR})</li>
* </ul>
* This is a stateless record-by-record operation, i.e, {@link #apply(Object, Object)} is invoked individually for each
* record of a stream (cf. {@link Transformer} for stateful record transformation).
* record of a stream (cf. {@link org.apache.kafka.streams.processor.api.Processor api.Processor} for stateful record transformation).
* {@code KeyValueMapper} is a generalization of {@link ValueMapper}.
*
* @param <K> key type
* @param <V> value type
* @param <VR> mapped value type
* @see ValueMapper
* @see Transformer
* @see org.apache.kafka.streams.processor.api.Processor
* @see KStream#map(KeyValueMapper)
* @see KStream#flatMap(KeyValueMapper)
* @see KStream#selectKey(KeyValueMapper)

View File

@ -47,7 +47,9 @@ import java.time.Duration;
* @see ValueTransformer
* @see KStream#map(KeyValueMapper)
* @see KStream#flatMap(KeyValueMapper)
* @deprecated Since 4.0. Use {@link org.apache.kafka.streams.processor.api.Processor api.Processor} instead.
*/
@Deprecated
public interface Transformer<K, V, R> {
/**

View File

@ -37,12 +37,14 @@ import java.util.function.Supplier;
* @see ValueTransformer
* @see ValueTransformerSupplier
* @see KStream#transformValues(ValueTransformerSupplier, String...)
* @deprecated Since 4.0. Use {@link org.apache.kafka.streams.processor.api.ProcessorSupplier api.ProcessorSupplier} instead.
*/
@Deprecated
public interface TransformerSupplier<K, V, R> extends ConnectedStoreProvider, Supplier<Transformer<K, V, R>> {
/**
* Return a newly constructed {@link Transformer} instance.
* The supplier should always generate a new instance each time {@link TransformerSupplier#get() gets called}.
* The supplier should always generate a new instance each time {@link TransformerSupplier#get() gets called}.
* <p>
* Creating a single {@link Transformer} object and returning the same object reference in {@link TransformerSupplier#get()}
* is a violation of the supplier pattern and leads to runtime exceptions.

View File

@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import java.time.Duration;
@ -46,7 +47,9 @@ import java.time.Duration;
* @see KStream#transformValues(ValueTransformerSupplier, String...)
* @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see Transformer
* @deprecated Since 4.0. Use {@link FixedKeyProcessor} instead.
*/
@Deprecated
public interface ValueTransformer<V, VR> {
/**

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.processor.ConnectedStoreProvider;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
/**
* A {@code ValueTransformerSupplier} interface which can create one or more {@link ValueTransformer} instances.
@ -35,12 +36,14 @@ import org.apache.kafka.streams.processor.ConnectedStoreProvider;
* @see Transformer
* @see TransformerSupplier
* @see KStream#transform(TransformerSupplier, String...)
* @deprecated Since 4.0. Use {@link FixedKeyProcessorSupplier} instead.
*/
@Deprecated
public interface ValueTransformerSupplier<V, VR> extends ConnectedStoreProvider {
/**
* Return a newly constructed {@link ValueTransformer} instance.
* The supplier should always generate a new instance each time {@link ValueTransformerSupplier#get()} gets called.
* The supplier should always generate a new instance each time {@link ValueTransformerSupplier#get()} gets called.
* <p>
* Creating a single {@link ValueTransformer} object and returning the same object reference in {@link ValueTransformerSupplier#get()}
* is a violation of the supplier pattern and leads to runtime exceptions.

View File

@ -22,8 +22,6 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
@ -111,14 +109,15 @@ public abstract class AbstractStream<K, V> {
return (readOnlyKey, value) -> valueMapper.apply(value);
}
@SuppressWarnings("deprecation")
static <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR> toValueTransformerWithKeySupplier(
final ValueTransformerSupplier<V, VR> valueTransformerSupplier) {
final org.apache.kafka.streams.kstream.ValueTransformerSupplier<V, VR> valueTransformerSupplier) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
ApiUtils.checkSupplier(valueTransformerSupplier);
return new ValueTransformerWithKeySupplier<K, V, VR>() {
@Override
public ValueTransformerWithKey<K, V, VR> get() {
final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
final org.apache.kafka.streams.kstream.ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
return new ValueTransformerWithKey<K, V, VR>() {
@Override
public void init(final ProcessorContext context) {

View File

@ -17,8 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
@ -31,9 +29,11 @@ import java.util.Set;
public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {
private final TransformerSupplier<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformerSupplier;
@SuppressWarnings("deprecation")
private final org.apache.kafka.streams.kstream.TransformerSupplier<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformerSupplier;
public KStreamFlatTransform(final TransformerSupplier<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformerSupplier) {
@SuppressWarnings("deprecation")
public KStreamFlatTransform(final org.apache.kafka.streams.kstream.TransformerSupplier<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformerSupplier) {
this.transformerSupplier = transformerSupplier;
}
@ -49,9 +49,11 @@ public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements ProcessorSupp
public static class KStreamFlatTransformProcessor<KIn, VIn, KOut, VOut> extends ContextualProcessor<KIn, VIn, KOut, VOut> {
private final Transformer<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformer;
@SuppressWarnings("deprecation")
private final org.apache.kafka.streams.kstream.Transformer<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformer;
public KStreamFlatTransformProcessor(final Transformer<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformer) {
@SuppressWarnings("deprecation")
public KStreamFlatTransformProcessor(final org.apache.kafka.streams.kstream.Transformer<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformer) {
this.transformer = transformer;
}

View File

@ -38,12 +38,10 @@ import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode.BaseRepartitionNodeBuilder;
@ -1309,7 +1307,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
@Deprecated
public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
public <KR, VR> KStream<KR, VR> transform(final org.apache.kafka.streams.kstream.TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
final String name = builder.newProcessorName(TRANSFORM_NAME);
@ -1318,7 +1316,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
@Deprecated
public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
public <KR, VR> KStream<KR, VR> transform(final org.apache.kafka.streams.kstream.TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
@ -1327,7 +1325,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
@Deprecated
public <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
public <K1, V1> KStream<K1, V1> flatTransform(final org.apache.kafka.streams.kstream.TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
final String name = builder.newProcessorName(TRANSFORM_NAME);
@ -1336,7 +1334,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
@Deprecated
public <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
public <K1, V1> KStream<K1, V1> flatTransform(final org.apache.kafka.streams.kstream.TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
@ -1369,7 +1367,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
@Deprecated
public <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
public <VR> KStream<K, VR> transformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
return doTransformValues(
@ -1380,7 +1378,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
@Deprecated
public <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
public <VR> KStream<K, VR> transformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
@ -1440,7 +1438,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
@Deprecated
public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
public <VR> KStream<K, VR> flatTransformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
return doFlatTransformValues(
@ -1451,7 +1449,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
@Deprecated
public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
public <VR> KStream<K, VR> flatTransformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");

View File

@ -17,27 +17,26 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Collections;
import java.util.Set;
public class TransformerSupplierAdapter<KIn, VIn, KOut, VOut> implements TransformerSupplier<KIn, VIn, Iterable<KeyValue<KOut, VOut>>> {
@Deprecated
public class TransformerSupplierAdapter<KIn, VIn, KOut, VOut> implements org.apache.kafka.streams.kstream.TransformerSupplier<KIn, VIn, Iterable<KeyValue<KOut, VOut>>> {
private final TransformerSupplier<KIn, VIn, KeyValue<KOut, VOut>> transformerSupplier;
private final org.apache.kafka.streams.kstream.TransformerSupplier<KIn, VIn, KeyValue<KOut, VOut>> transformerSupplier;
public TransformerSupplierAdapter(final TransformerSupplier<KIn, VIn, KeyValue<KOut, VOut>> transformerSupplier) {
public TransformerSupplierAdapter(final org.apache.kafka.streams.kstream.TransformerSupplier<KIn, VIn, KeyValue<KOut, VOut>> transformerSupplier) {
this.transformerSupplier = transformerSupplier;
}
@Override
public Transformer<KIn, VIn, Iterable<KeyValue<KOut, VOut>>> get() {
return new Transformer<KIn, VIn, Iterable<KeyValue<KOut, VOut>>>() {
public org.apache.kafka.streams.kstream.Transformer<KIn, VIn, Iterable<KeyValue<KOut, VOut>>> get() {
return new org.apache.kafka.streams.kstream.Transformer<KIn, VIn, Iterable<KeyValue<KOut, VOut>>>() {
private final Transformer<KIn, VIn, KeyValue<KOut, VOut>> transformer = transformerSupplier.get();
private final org.apache.kafka.streams.kstream.Transformer<KIn, VIn, KeyValue<KOut, VOut>> transformer = transformerSupplier.get();
@Override
public void init(final ProcessorContext context) {

View File

@ -19,8 +19,6 @@ package org.apache.kafka.streams.processor;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.state.StoreBuilder;
@ -93,16 +91,16 @@ import java.util.Set;
* @see Topology#addProcessor(String, org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
* @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
* @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, Named, String...)
* @see KStream#transform(TransformerSupplier, String...)
* @see KStream#transform(TransformerSupplier, Named, String...)
* @see KStream#transformValues(ValueTransformerSupplier, String...)
* @see KStream#transformValues(ValueTransformerSupplier, Named, String...)
* @see KStream#transform(org.apache.kafka.streams.kstream.TransformerSupplier, String...)
* @see KStream#transform(org.apache.kafka.streams.kstream.TransformerSupplier, Named, String...)
* @see KStream#transformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, String...)
* @see KStream#transformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, Named, String...)
* @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see KStream#transformValues(ValueTransformerWithKeySupplier, Named, String...)
* @see KStream#flatTransform(TransformerSupplier, String...)
* @see KStream#flatTransform(TransformerSupplier, Named, String...)
* @see KStream#flatTransformValues(ValueTransformerSupplier, String...)
* @see KStream#flatTransformValues(ValueTransformerSupplier, Named, String...)
* @see KStream#flatTransform(org.apache.kafka.streams.kstream.TransformerSupplier, String...)
* @see KStream#flatTransform(org.apache.kafka.streams.kstream.TransformerSupplier, Named, String...)
* @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, String...)
* @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, Named, String...)
* @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, String...)
* @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, Named, String...)
*/

View File

@ -25,10 +25,6 @@ import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
@ -54,6 +50,7 @@ import static org.hamcrest.core.IsEqual.equalTo;
@Tag("integration")
@Timeout(600)
@SuppressWarnings("deprecation")
public class KStreamTransformIntegrationTest {
private StreamsBuilder builder;
private final String topic = "stream";
@ -90,7 +87,7 @@ public class KStreamTransformIntegrationTest {
assertThat(results, equalTo(expected));
}
private class TestTransformer implements Transformer<Integer, Integer, KeyValue<Integer, Integer>> {
private class TestTransformer implements org.apache.kafka.streams.kstream.Transformer<Integer, Integer, KeyValue<Integer, Integer>> {
private KeyValueStore<Integer, Integer> state;
@Override
@ -113,7 +110,6 @@ public class KStreamTransformIntegrationTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldTransform() {
builder.addStateStore(storeBuilder());
@ -132,12 +128,11 @@ public class KStreamTransformIntegrationTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldTransformWithConnectedStoreProvider() {
stream
.transform(new TransformerSupplier<Integer, Integer, KeyValue<Integer, Integer>>() {
.transform(new org.apache.kafka.streams.kstream.TransformerSupplier<Integer, Integer, KeyValue<Integer, Integer>>() {
@Override
public Transformer<Integer, Integer, KeyValue<Integer, Integer>> get() {
public org.apache.kafka.streams.kstream.Transformer<Integer, Integer, KeyValue<Integer, Integer>> get() {
return new TestTransformer();
}
@ -158,7 +153,7 @@ public class KStreamTransformIntegrationTest {
verifyResult(expected);
}
private class TestFlatTransformer implements Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>> {
private class TestFlatTransformer implements org.apache.kafka.streams.kstream.Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>> {
private KeyValueStore<Integer, Integer> state;
@Override
@ -184,7 +179,6 @@ public class KStreamTransformIntegrationTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldFlatTransform() {
builder.addStateStore(storeBuilder());
@ -215,12 +209,11 @@ public class KStreamTransformIntegrationTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldFlatTransformWithConnectedStoreProvider() {
stream
.flatTransform(new TransformerSupplier<Integer, Integer, Iterable<KeyValue<Integer, Integer>>>() {
.flatTransform(new org.apache.kafka.streams.kstream.TransformerSupplier<Integer, Integer, Iterable<KeyValue<Integer, Integer>>>() {
@Override
public Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>> get() {
public org.apache.kafka.streams.kstream.Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>> get() {
return new TestFlatTransformer();
}
@ -276,7 +269,6 @@ public class KStreamTransformIntegrationTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldTransformValuesWithValueTransformerWithKey() {
builder.addStateStore(storeBuilder());
@ -295,7 +287,6 @@ public class KStreamTransformIntegrationTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldTransformValuesWithValueTransformerWithKeyWithConnectedStoreProvider() {
stream
.transformValues(new ValueTransformerWithKeySupplier<Integer, Integer, Integer>() {
@ -312,7 +303,7 @@ public class KStreamTransformIntegrationTest {
.foreach(accumulateExpected);
}
private class TestValueTransformer implements ValueTransformer<Integer, Integer> {
private class TestValueTransformer implements org.apache.kafka.streams.kstream.ValueTransformer<Integer, Integer> {
private KeyValueStore<Integer, Integer> state;
@Override
@ -334,7 +325,6 @@ public class KStreamTransformIntegrationTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldTransformValuesWithValueTransformerWithoutKey() {
builder.addStateStore(storeBuilder());
@ -353,12 +343,11 @@ public class KStreamTransformIntegrationTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldTransformValuesWithValueTransformerWithoutKeyWithConnectedStoreProvider() {
stream
.transformValues(new ValueTransformerSupplier<Integer, Integer>() {
.transformValues(new org.apache.kafka.streams.kstream.ValueTransformerSupplier<Integer, Integer>() {
@Override
public ValueTransformer<Integer, Integer> get() {
public org.apache.kafka.streams.kstream.ValueTransformer<Integer, Integer> get() {
return new TestValueTransformer();
}
@ -405,7 +394,6 @@ public class KStreamTransformIntegrationTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldFlatTransformValuesWithKey() {
builder.addStateStore(storeBuilder());
@ -436,7 +424,6 @@ public class KStreamTransformIntegrationTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldFlatTransformValuesWithKeyWithConnectedStoreProvider() {
stream
.flatTransformValues(new ValueTransformerWithKeySupplier<Integer, Integer, Iterable<Integer>>() {
@ -474,7 +461,7 @@ public class KStreamTransformIntegrationTest {
verifyResult(expected);
}
private class TestFlatValueTransformer implements ValueTransformer<Integer, Iterable<Integer>> {
private class TestFlatValueTransformer implements org.apache.kafka.streams.kstream.ValueTransformer<Integer, Iterable<Integer>> {
private KeyValueStore<Integer, Integer> state;
@Override
@ -500,7 +487,6 @@ public class KStreamTransformIntegrationTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldFlatTransformValuesWithValueTransformerWithoutKey() {
builder.addStateStore(storeBuilder());
@ -531,12 +517,11 @@ public class KStreamTransformIntegrationTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldFlatTransformValuesWithValueTransformerWithoutKeyWithConnectedStoreProvider() {
stream
.flatTransformValues(new ValueTransformerSupplier<Integer, Iterable<Integer>>() {
.flatTransformValues(new org.apache.kafka.streams.kstream.ValueTransformerSupplier<Integer, Iterable<Integer>>() {
@Override
public ValueTransformer<Integer, Iterable<Integer>> get() {
public org.apache.kafka.streams.kstream.ValueTransformer<Integer, Iterable<Integer>> get() {
return new TestFlatValueTransformer();
}

View File

@ -30,7 +30,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateDirectory;
@ -336,7 +335,7 @@ public class StandbyTaskEOSIntegrationTest {
);
builder.<Integer, Integer>stream(inputTopic)
.transform(
() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
() -> new org.apache.kafka.streams.kstream.Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
private KeyValueStore<Integer, Integer> store;
@SuppressWarnings("unchecked")
@ -388,8 +387,8 @@ public class StandbyTaskEOSIntegrationTest {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath);
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
// need to set to zero to get predictable active/standby task assignments
streamsConfiguration.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 0);

View File

@ -24,7 +24,6 @@ import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
@ -52,9 +51,11 @@ import static org.mockito.Mockito.when;
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class AbstractStreamTest {
@SuppressWarnings("deprecation")
@Test
public void testToInternalValueTransformerSupplierSuppliesNewTransformers() {
final ValueTransformerSupplier<?, ?> valueTransformerSupplier = mock(ValueTransformerSupplier.class);
final org.apache.kafka.streams.kstream.ValueTransformerSupplier<?, ?> valueTransformerSupplier =
mock(org.apache.kafka.streams.kstream.ValueTransformerSupplier.class);
when(valueTransformerSupplier.get())
.thenReturn(new NoopValueTransformer<>())
.thenReturn(new NoopValueTransformer<>());

View File

@ -17,8 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.internals.KStreamFlatTransform.KStreamFlatTransformProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
@ -46,13 +44,14 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@SuppressWarnings("deprecation")
public class KStreamFlatTransformTest {
private Number inputKey;
private Number inputValue;
@Mock
private Transformer<Number, Number, Iterable<KeyValue<Integer, Integer>>> transformer;
private org.apache.kafka.streams.kstream.Transformer<Number, Number, Iterable<KeyValue<Integer, Integer>>> transformer;
@Mock
private InternalProcessorContext<Integer, Integer> context;
private InOrder inOrder;
@ -124,8 +123,8 @@ public class KStreamFlatTransformTest {
@Test
public void shouldGetFlatTransformProcessor() {
@SuppressWarnings("unchecked")
final TransformerSupplier<Number, Number, Iterable<KeyValue<Integer, Integer>>> transformerSupplier =
mock(TransformerSupplier.class);
final org.apache.kafka.streams.kstream.TransformerSupplier<Number, Number, Iterable<KeyValue<Integer, Integer>>> transformerSupplier =
mock(org.apache.kafka.streams.kstream.TransformerSupplier.class);
final KStreamFlatTransform<Number, Number, Integer, Integer> processorSupplier =
new KStreamFlatTransform<>(transformerSupplier);

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
@ -43,14 +45,10 @@ import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
@ -64,6 +62,7 @@ import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@ -116,8 +115,9 @@ public class KStreamImplTest {
private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
private final MockApiProcessorSupplier<String, String, Void, Void> processorSupplier = new MockApiProcessorSupplier<>();
private final MockApiFixedKeyProcessorSupplier<String, String, Void> fixedKeyProcessorSupplier = new MockApiFixedKeyProcessorSupplier<>();
private final TransformerSupplier<String, String, KeyValue<String, String>> transformerSupplier =
() -> new Transformer<String, String, KeyValue<String, String>>() {
@SuppressWarnings("deprecation")
private final org.apache.kafka.streams.kstream.TransformerSupplier<String, String, KeyValue<String, String>> transformerSupplier =
() -> new org.apache.kafka.streams.kstream.Transformer<String, String, KeyValue<String, String>>() {
@Override
public void init(final ProcessorContext context) {}
@ -129,8 +129,9 @@ public class KStreamImplTest {
@Override
public void close() {}
};
private final TransformerSupplier<String, String, Iterable<KeyValue<String, String>>> flatTransformerSupplier =
() -> new Transformer<String, String, Iterable<KeyValue<String, String>>>() {
@SuppressWarnings("deprecation")
private final org.apache.kafka.streams.kstream.TransformerSupplier<String, String, Iterable<KeyValue<String, String>>> flatTransformerSupplier =
() -> new org.apache.kafka.streams.kstream.Transformer<String, String, Iterable<KeyValue<String, String>>>() {
@Override
public void init(final ProcessorContext context) {}
@ -142,8 +143,9 @@ public class KStreamImplTest {
@Override
public void close() {}
};
private final ValueTransformerSupplier<String, String> valueTransformerSupplier =
() -> new ValueTransformer<String, String>() {
@SuppressWarnings("deprecation")
private final org.apache.kafka.streams.kstream.ValueTransformerSupplier<String, String> valueTransformerSupplier =
() -> new org.apache.kafka.streams.kstream.ValueTransformer<String, String>() {
@Override
public void init(final ProcessorContext context) {}
@ -168,8 +170,9 @@ public class KStreamImplTest {
@Override
public void close() {}
};
private final ValueTransformerSupplier<String, Iterable<String>> flatValueTransformerSupplier =
() -> new ValueTransformer<String, Iterable<String>>() {
@SuppressWarnings("deprecation")
private final org.apache.kafka.streams.kstream.ValueTransformerSupplier<String, Iterable<String>> flatValueTransformerSupplier =
() -> new org.apache.kafka.streams.kstream.ValueTransformer<String, Iterable<String>>() {
@Override
public void init(final ProcessorContext context) {}
@ -1859,7 +1862,7 @@ public class KStreamImplTest {
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowBadTransformerSupplierOnFlatTransform() {
final Transformer<String, String, Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
final org.apache.kafka.streams.kstream.Transformer<String, String, Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> testStream.flatTransform(() -> transformer)
@ -1870,7 +1873,7 @@ public class KStreamImplTest {
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowBadTransformerSupplierOnFlatTransformWithStores() {
final Transformer<String, String, Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
final org.apache.kafka.streams.kstream.Transformer<String, String, Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> testStream.flatTransform(() -> transformer, "storeName")
@ -1881,7 +1884,7 @@ public class KStreamImplTest {
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowBadTransformerSupplierOnFlatTransformWithNamed() {
final Transformer<String, String, Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
final org.apache.kafka.streams.kstream.Transformer<String, String, Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> testStream.flatTransform(() -> transformer, Named.as("flatTransformer"))
@ -1892,7 +1895,7 @@ public class KStreamImplTest {
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowBadTransformerSupplierOnFlatTransformWithNamedAndStores() {
final Transformer<String, String, Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
final org.apache.kafka.streams.kstream.Transformer<String, String, Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> testStream.flatTransform(() -> transformer, Named.as("flatTransformer"), "storeName")
@ -1993,7 +1996,7 @@ public class KStreamImplTest {
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowBadTransformerSupplierOnTransformValues() {
final ValueTransformer<String, String> transformer = valueTransformerSupplier.get();
final org.apache.kafka.streams.kstream.ValueTransformer<String, String> transformer = valueTransformerSupplier.get();
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> testStream.transformValues(() -> transformer)
@ -2004,7 +2007,7 @@ public class KStreamImplTest {
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowBadTransformerSupplierOnTransformValuesWithNamed() {
final ValueTransformer<String, String> transformer = valueTransformerSupplier.get();
final org.apache.kafka.streams.kstream.ValueTransformer<String, String> transformer = valueTransformerSupplier.get();
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> testStream.transformValues(() -> transformer, Named.as("transformer"))
@ -2017,7 +2020,7 @@ public class KStreamImplTest {
public void shouldNotAllowNullValueTransformerSupplierOnTransformValues() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.transformValues((ValueTransformerSupplier<Object, Object>) null));
() -> testStream.transformValues((org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Object>) null));
assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null"));
}
@ -2058,7 +2061,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.transformValues(
(ValueTransformerSupplier<Object, Object>) null,
(org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Object>) null,
"storeName"));
assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null"));
}
@ -2080,7 +2083,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.transformValues(
(ValueTransformerSupplier<Object, Object>) null,
(org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Object>) null,
Named.as("valueTransformer")));
assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null"));
}
@ -2102,7 +2105,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.transformValues(
(ValueTransformerSupplier<Object, Object>) null,
(org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Object>) null,
Named.as("valueTransformer"),
"storeName"));
assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null"));
@ -2262,7 +2265,7 @@ public class KStreamImplTest {
public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValues() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues((ValueTransformerSupplier<Object, Iterable<Object>>) null));
() -> testStream.flatTransformValues((org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Iterable<Object>>) null));
assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null"));
}
@ -2281,7 +2284,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
(ValueTransformerSupplier<Object, Iterable<Object>>) null,
(org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Iterable<Object>>) null,
"stateStore"));
assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null"));
}
@ -2303,7 +2306,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
(ValueTransformerSupplier<Object, Iterable<Object>>) null,
(org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Iterable<Object>>) null,
Named.as("flatValueTransformer")));
assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null"));
}
@ -2325,7 +2328,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
(ValueTransformerSupplier<Object, Iterable<Object>>) null,
(org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Iterable<Object>>) null,
Named.as("flatValueTransformer"),
"stateStore"));
assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null"));
@ -2678,9 +2681,9 @@ public class KStreamImplTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(input, Serdes.String().serializer(), Serdes.String().serializer());
driver.createInputTopic(input, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic =
driver.createOutputTopic(output, Serdes.String().deserializer(), Serdes.String().deserializer());
driver.createOutputTopic(output, new StringDeserializer(), new StringDeserializer());
inputTopic.pipeInput("A", "01", 5L);
inputTopic.pipeInput("B", "02", 100L);
@ -2761,8 +2764,8 @@ public class KStreamImplTest {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(
input,
Serdes.String().serializer(),
Serdes.String().serializer()
new StringSerializer(),
new StringSerializer()
);
inputTopic.pipeInput("A", "0", 5L);
@ -2852,8 +2855,8 @@ public class KStreamImplTest {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(
input,
Serdes.String().serializer(),
Serdes.String().serializer()
new StringSerializer(),
new StringSerializer()
);
inputTopic.pipeInput("A", "0", 5L);
@ -2908,14 +2911,14 @@ public class KStreamImplTest {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(
input,
Serdes.String().serializer(),
Serdes.String().serializer()
new StringSerializer(),
new StringSerializer()
);
final TestOutputTopic<String, Integer> outputTopic =
driver.createOutputTopic(
output,
Serdes.String().deserializer(),
Serdes.Integer().deserializer()
new StringDeserializer(),
new IntegerDeserializer()
);
inputTopic.pipeInput("A", "0", 5L);
@ -2974,14 +2977,14 @@ public class KStreamImplTest {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(
input,
Serdes.String().serializer(),
Serdes.String().serializer()
new StringSerializer(),
new StringSerializer()
);
final TestOutputTopic<String, Integer> outputTopic =
driver.createOutputTopic(
output,
Serdes.String().deserializer(),
Serdes.Integer().deserializer()
new StringDeserializer(),
new IntegerDeserializer()
);
inputTopic.pipeInput("A", "0", 5L);
@ -3030,7 +3033,7 @@ public class KStreamImplTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(input, Serdes.String().serializer(), Serdes.String().serializer());
driver.createInputTopic(input, new StringSerializer(), new StringSerializer());
final KeyValueStore<String, String> store = driver.getKeyValueStore(storeName);
inputTopic.pipeInput("A", "01");
@ -3089,9 +3092,9 @@ public class KStreamImplTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(input, Serdes.String().serializer(), Serdes.String().serializer());
driver.createInputTopic(input, new StringSerializer(), new StringSerializer());
final TestOutputTopic<Integer, String> outputTopic =
driver.createOutputTopic(output, Serdes.Integer().deserializer(), Serdes.String().deserializer());
driver.createOutputTopic(output, new IntegerDeserializer(), new StringDeserializer());
inputTopic.pipeInput("A", "01", 5L);
inputTopic.pipeInput("B", "02", 100L);
@ -3439,7 +3442,7 @@ public class KStreamImplTest {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(input, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestOutputTopic<String, Long> outputTopic =
driver.createOutputTopic(output, Serdes.String().deserializer(), Serdes.Long().deserializer());
driver.createOutputTopic(output, new StringDeserializer(), new LongDeserializer());
inputTopic.pipeInput("A", "green", 10L);
inputTopic.pipeInput("B", "green", 9L);
@ -3504,7 +3507,7 @@ public class KStreamImplTest {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(input, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestOutputTopic<String, Integer> outputTopic =
driver.createOutputTopic(output, Serdes.String().deserializer(), Serdes.Integer().deserializer());
driver.createOutputTopic(output, new StringDeserializer(), new IntegerDeserializer());
final KeyValueStore<String, Integer> store = driver.getKeyValueStore(storeName);
inputTopic.pipeInput("A", "green", 10L);
@ -3535,7 +3538,9 @@ public class KStreamImplTest {
private static <K, V> Map<K, V> asMap(final KeyValueStore<K, V> store) {
final HashMap<K, V> result = new HashMap<>();
store.all().forEachRemaining(kv -> result.put(kv.key, kv.value));
try (final KeyValueIterator<K, V> it = store.all()) {
it.forEachRemaining(kv -> result.put(kv.key, kv.value));
}
return result;
}
}

View File

@ -25,8 +25,6 @@ import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.To;
@ -41,17 +39,17 @@ import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
@SuppressWarnings("deprecation")
public class KStreamTransformTest {
private static final String TOPIC_NAME = "topic";
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testTransform() {
final StreamsBuilder builder = new StreamsBuilder();
final TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier =
() -> new Transformer<Number, Number, KeyValue<Integer, Integer>>() {
final org.apache.kafka.streams.kstream.TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier =
() -> new org.apache.kafka.streams.kstream.Transformer<Number, Number, KeyValue<Integer, Integer>>() {
private int total = 0;
@Override
@ -108,13 +106,12 @@ public class KStreamTransformTest {
}
}
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
@Test
public void testTransformWithNewDriverAndPunctuator() {
final StreamsBuilder builder = new StreamsBuilder();
final TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier =
() -> new Transformer<Number, Number, KeyValue<Integer, Integer>>() {
final org.apache.kafka.streams.kstream.TransformerSupplier<Number, Number, KeyValue<Integer, Integer>> transformerSupplier =
() -> new org.apache.kafka.streams.kstream.Transformer<Number, Number, KeyValue<Integer, Integer>>() {
private int total = 0;
@Override

View File

@ -24,8 +24,6 @@ import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.api.Processor;
@ -61,8 +59,8 @@ public class KStreamTransformValuesTest {
public void testTransform() {
final StreamsBuilder builder = new StreamsBuilder();
final ValueTransformerSupplier<Number, Integer> valueTransformerSupplier =
() -> new ValueTransformer<Number, Integer>() {
final org.apache.kafka.streams.kstream.ValueTransformerSupplier<Number, Integer> valueTransformerSupplier =
() -> new org.apache.kafka.streams.kstream.ValueTransformer<Number, Integer>() {
private int total = 0;
@Override

View File

@ -17,8 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.StoreBuilder;
@ -41,13 +39,16 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@SuppressWarnings("deprecation")
public class TransformerSupplierAdapterTest {
private ProcessorContext context = mock(ProcessorContext.class);
@SuppressWarnings("unchecked")
private Transformer<String, String, KeyValue<Integer, Integer>> transformer = mock(Transformer.class);
private org.apache.kafka.streams.kstream.Transformer<String, String, KeyValue<Integer, Integer>> transformer =
mock(org.apache.kafka.streams.kstream.Transformer.class);
@SuppressWarnings("unchecked")
private TransformerSupplier<String, String, KeyValue<Integer, Integer>> transformerSupplier = mock(TransformerSupplier.class);
private org.apache.kafka.streams.kstream.TransformerSupplier<String, String, KeyValue<Integer, Integer>> transformerSupplier =
mock(org.apache.kafka.streams.kstream.TransformerSupplier.class);
@SuppressWarnings("unchecked")
private Set<StoreBuilder<?>> stores = mock(Set.class);
@ -60,7 +61,7 @@ public class TransformerSupplierAdapterTest {
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
new TransformerSupplierAdapter<>(transformerSupplier);
final Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adaptedTransformer = adapter.get();
final org.apache.kafka.streams.kstream.Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adaptedTransformer = adapter.get();
adaptedTransformer.init(context);
verify(transformer).init(context);
@ -72,7 +73,7 @@ public class TransformerSupplierAdapterTest {
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
new TransformerSupplierAdapter<>(transformerSupplier);
final Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adaptedTransformer = adapter.get();
final org.apache.kafka.streams.kstream.Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adaptedTransformer = adapter.get();
adaptedTransformer.close();
verify(transformer).close();
@ -94,7 +95,7 @@ public class TransformerSupplierAdapterTest {
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
new TransformerSupplierAdapter<>(transformerSupplier);
final Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adaptedTransformer = adapter.get();
final org.apache.kafka.streams.kstream.Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adaptedTransformer = adapter.get();
final Iterator<KeyValue<Integer, Integer>> iterator = adaptedTransformer.transform(key, value).iterator();
assertThat(iterator.hasNext(), equalTo(true));
@ -109,7 +110,7 @@ public class TransformerSupplierAdapterTest {
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
new TransformerSupplierAdapter<>(transformerSupplier);
final Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adaptedTransformer = adapter.get();
final org.apache.kafka.streams.kstream.Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adaptedTransformer = adapter.get();
final Iterator<KeyValue<Integer, Integer>> iterator = adaptedTransformer.transform(key, value).iterator();
assertThat(iterator.hasNext(), equalTo(false));
@ -118,20 +119,23 @@ public class TransformerSupplierAdapterTest {
@Test
public void shouldAlwaysGetNewAdapterTransformer() {
@SuppressWarnings("unchecked")
final Transformer<String, String, KeyValue<Integer, Integer>> transformer1 = mock(Transformer.class);
final org.apache.kafka.streams.kstream.Transformer<String, String, KeyValue<Integer, Integer>> transformer1 =
mock(org.apache.kafka.streams.kstream.Transformer.class);
@SuppressWarnings("unchecked")
final Transformer<String, String, KeyValue<Integer, Integer>> transformer2 = mock(Transformer.class);
final org.apache.kafka.streams.kstream.Transformer<String, String, KeyValue<Integer, Integer>> transformer2 =
mock(org.apache.kafka.streams.kstream.Transformer.class);
@SuppressWarnings("unchecked")
final Transformer<String, String, KeyValue<Integer, Integer>> transformer3 = mock(Transformer.class);
final org.apache.kafka.streams.kstream.Transformer<String, String, KeyValue<Integer, Integer>> transformer3 =
mock(org.apache.kafka.streams.kstream.Transformer.class);
when(transformerSupplier.get()).thenReturn(transformer1).thenReturn(transformer2).thenReturn(transformer3);
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
new TransformerSupplierAdapter<>(transformerSupplier);
final Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adapterTransformer1 = adapter.get();
final org.apache.kafka.streams.kstream.Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adapterTransformer1 = adapter.get();
adapterTransformer1.init(context);
final Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adapterTransformer2 = adapter.get();
final org.apache.kafka.streams.kstream.Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adapterTransformer2 = adapter.get();
adapterTransformer2.init(context);
final Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adapterTransformer3 = adapter.get();
final org.apache.kafka.streams.kstream.Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adapterTransformer3 = adapter.get();
adapterTransformer3.init(context);
assertThat(adapterTransformer1, not(sameInstance(adapterTransformer2)));

View File

@ -18,7 +18,6 @@ package org.apache.kafka.test;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.CommitCallback;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
@ -45,7 +44,8 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext<Object, Object> {
@SuppressWarnings("deprecation")
public class MockInternalProcessorContext extends org.apache.kafka.streams.processor.MockProcessorContext implements InternalProcessorContext<Object, Object> {
private final Map<String, StateRestoreCallback> restoreCallbacks = new LinkedHashMap<>();
private ProcessorNode currentNode;

View File

@ -16,10 +16,10 @@
*/
package org.apache.kafka.test;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.processor.ProcessorContext;
public class NoopValueTransformer<K, V, VR> implements ValueTransformer<V, VR> {
@SuppressWarnings("deprecation")
public class NoopValueTransformer<K, V, VR> implements org.apache.kafka.streams.kstream.ValueTransformer<V, VR> {
@Override
public void init(final ProcessorContext context) {
}

View File

@ -86,10 +86,16 @@ private[scala] object FunctionsCompatConversions {
def asInitializer: Initializer[VA] = () => f()
}
@deprecated(
since = "4.0.0"
)
implicit class TransformerSupplierFromFunction[K, V, VO](val f: () => Transformer[K, V, VO]) extends AnyVal {
def asTransformerSupplier: TransformerSupplier[K, V, VO] = () => f()
}
@deprecated(
since = "4.0.0"
)
implicit class TransformerSupplierAsJava[K, V, VO](val supplier: TransformerSupplier[K, V, Iterable[VO]])
extends AnyVal {
def asJava: TransformerSupplier[K, V, JIterable[VO]] = () => {
@ -101,6 +107,10 @@ private[scala] object FunctionsCompatConversions {
}
}
}
@deprecated(
since = "4.0.0"
)
implicit class ValueTransformerSupplierAsJava[V, VO](val supplier: ValueTransformerSupplier[V, Iterable[VO]])
extends AnyVal {
def asJava: ValueTransformerSupplier[V, JIterable[VO]] = () => {
@ -112,6 +122,10 @@ private[scala] object FunctionsCompatConversions {
}
}
}
@deprecated(
since = "4.0.0"
)
implicit class ValueTransformerSupplierWithKeyAsJava[K, V, VO](
val supplier: ValueTransformerWithKeySupplier[K, V, Iterable[VO]]
) extends AnyVal {

View File

@ -28,8 +28,6 @@ import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@ -46,7 +44,8 @@ import java.util.Properties;
/**
* {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
* {@link Transformer}, and {@link ValueTransformer} implementations.
* {@link org.apache.kafka.streams.kstream.Transformer}, and {@link org.apache.kafka.streams.kstream.ValueTransformer}
* implementations.
* <p>
* The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
* tests that serve as example usage.
@ -55,8 +54,10 @@ import java.util.Properties;
* It simply captures any data it witnesses.
* If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
* {@link Topology} and using the {@link TopologyTestDriver}.
*
* @deprecated Since 4.0. Use {@link org.apache.kafka.streams.processor.api.MockProcessorContext} instead.
*/
@SuppressWarnings("deprecation") // not deprecating old PAPI Context, since it is still in use by Transformers.
@Deprecated
public class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
// Immutable fields ================================================
private final StreamsMetricsImpl metrics;

View File

@ -25,8 +25,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.CommitCallback;
import org.apache.kafka.streams.processor.PunctuationType;
@ -56,8 +54,8 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
/**
* {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
* {@link Transformer}, and {@link ValueTransformer} implementations.
* {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor}
* implementations.
* <p>
* The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
* tests that serve as example usage.

View File

@ -18,8 +18,6 @@ package org.apache.kafka.streams;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.MockProcessorContext.CapturedForward;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
@ -53,13 +51,13 @@ public class MockProcessorContextTest {
}
};
final MockProcessorContext context = new MockProcessorContext();
final org.apache.kafka.streams.processor.MockProcessorContext context = new org.apache.kafka.streams.processor.MockProcessorContext();
processor.init(context);
processor.process("foo", 5L);
processor.process("barbaz", 50L);
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
final Iterator<org.apache.kafka.streams.processor.MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator();
assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
assertFalse(forwarded.hasNext());
@ -78,14 +76,14 @@ public class MockProcessorContextTest {
}
};
final MockProcessorContext context = new MockProcessorContext();
final org.apache.kafka.streams.processor.MockProcessorContext context = new org.apache.kafka.streams.processor.MockProcessorContext();
processor.init(context);
processor.process("foo", 5L);
processor.process("barbaz", 50L);
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
final Iterator<org.apache.kafka.streams.processor.MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator();
assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
assertFalse(forwarded.hasNext());
@ -111,7 +109,7 @@ public class MockProcessorContextTest {
}
};
final MockProcessorContext context = new MockProcessorContext();
final org.apache.kafka.streams.processor.MockProcessorContext context = new org.apache.kafka.streams.processor.MockProcessorContext();
processor.init(context);
@ -119,17 +117,17 @@ public class MockProcessorContextTest {
processor.process("barbaz", 50L);
{
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
final Iterator<org.apache.kafka.streams.processor.MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator();
final CapturedForward forward1 = forwarded.next();
final org.apache.kafka.streams.processor.MockProcessorContext.CapturedForward forward1 = forwarded.next();
assertEquals(new KeyValue<>("start", -1L), forward1.keyValue());
assertNull(forward1.childName());
final CapturedForward forward2 = forwarded.next();
final org.apache.kafka.streams.processor.MockProcessorContext.CapturedForward forward2 = forwarded.next();
assertEquals(new KeyValue<>("foo5", 8L), forward2.keyValue());
assertEquals("george", forward2.childName());
final CapturedForward forward3 = forwarded.next();
final org.apache.kafka.streams.processor.MockProcessorContext.CapturedForward forward3 = forwarded.next();
assertEquals(new KeyValue<>("barbaz50", 56L), forward3.keyValue());
assertEquals("pete", forward3.childName());
@ -137,21 +135,21 @@ public class MockProcessorContextTest {
}
{
final Iterator<CapturedForward> forwarded = context.forwarded("george").iterator();
final Iterator<org.apache.kafka.streams.processor.MockProcessorContext.CapturedForward> forwarded = context.forwarded("george").iterator();
assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
assertFalse(forwarded.hasNext());
}
{
final Iterator<CapturedForward> forwarded = context.forwarded("pete").iterator();
final Iterator<org.apache.kafka.streams.processor.MockProcessorContext.CapturedForward> forwarded = context.forwarded("pete").iterator();
assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
assertFalse(forwarded.hasNext());
}
{
final Iterator<CapturedForward> forwarded = context.forwarded("steve").iterator();
final Iterator<org.apache.kafka.streams.processor.MockProcessorContext.CapturedForward> forwarded = context.forwarded("steve").iterator();
assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
assertFalse(forwarded.hasNext());
}
@ -170,7 +168,7 @@ public class MockProcessorContextTest {
}
};
final MockProcessorContext context = new MockProcessorContext();
final org.apache.kafka.streams.processor.MockProcessorContext context = new org.apache.kafka.streams.processor.MockProcessorContext();
processor.init(context);
@ -199,7 +197,7 @@ public class MockProcessorContextTest {
}
};
final MockProcessorContext context = new MockProcessorContext();
final org.apache.kafka.streams.processor.MockProcessorContext context = new org.apache.kafka.streams.processor.MockProcessorContext();
final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("my-state"),
@ -241,7 +239,7 @@ public class MockProcessorContextTest {
}
};
final MockProcessorContext context = new MockProcessorContext(config);
final org.apache.kafka.streams.processor.MockProcessorContext context = new org.apache.kafka.streams.processor.MockProcessorContext(config);
processor.init(context);
try {
@ -256,7 +254,7 @@ public class MockProcessorContextTest {
{
processor.process("foo", 5L);
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
final Iterator<org.apache.kafka.streams.processor.MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator();
assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue());
@ -277,7 +275,7 @@ public class MockProcessorContextTest {
{
processor.process("bar", 50L);
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
final Iterator<org.apache.kafka.streams.processor.MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator();
assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue());
@ -297,7 +295,7 @@ public class MockProcessorContextTest {
{
processor.process("baz", 500L);
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
final Iterator<org.apache.kafka.streams.processor.MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator();
assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
assertEquals(new KeyValue<>("topic", "t2"), forwarded.next().keyValue());
@ -330,11 +328,11 @@ public class MockProcessorContextTest {
}
};
final MockProcessorContext context = new MockProcessorContext();
final org.apache.kafka.streams.processor.MockProcessorContext context = new org.apache.kafka.streams.processor.MockProcessorContext();
processor.init(context);
final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
final org.apache.kafka.streams.processor.MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
assertEquals(1000L, capturedPunctuator.getIntervalMs());
assertEquals(PunctuationType.WALL_CLOCK_TIME, capturedPunctuator.getType());
assertFalse(capturedPunctuator.cancelled());
@ -349,18 +347,18 @@ public class MockProcessorContextTest {
public void fullConstructorShouldSetAllExpectedAttributes() {
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class);
final File dummyFile = new File("");
final MockProcessorContext context = new MockProcessorContext(config, new TaskId(1, 1), dummyFile);
final org.apache.kafka.streams.processor.MockProcessorContext context = new org.apache.kafka.streams.processor.MockProcessorContext(config, new TaskId(1, 1), dummyFile);
assertEquals("testFullConstructor", context.applicationId());
assertEquals(new TaskId(1, 1), context.taskId());
assertEquals("testFullConstructor", context.appConfigs().get(StreamsConfig.APPLICATION_ID_CONFIG));
assertEquals("testFullConstructor", context.appConfigsWithPrefix("application.").get("id"));
assertEquals(Serdes.String().getClass(), context.keySerde().getClass());
assertEquals(Serdes.Long().getClass(), context.valueSerde().getClass());
assertEquals(Serdes.StringSerde.class, context.keySerde().getClass());
assertEquals(Serdes.LongSerde.class, context.valueSerde().getClass());
assertEquals(dummyFile, context.stateDir());
}
}