mirror of https://github.com/apache/kafka.git
KAFKA-12829: Remove deprecated KStream.process() for old Processor API (#18088)
Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
parent
de2ccb5789
commit
ab2facca58
|
@ -2967,209 +2967,6 @@ public interface KStream<K, V> {
|
|||
final ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> valueJoiner,
|
||||
final Named named);
|
||||
|
||||
/**
|
||||
* Process all records in this stream, one record at a time, by applying a
|
||||
* {@link org.apache.kafka.streams.processor.Processor} (provided by the given
|
||||
* {@link org.apache.kafka.streams.processor.ProcessorSupplier}).
|
||||
* Attaching a state store makes this a stateful record-by-record operation (cf. {@link #foreach(ForeachAction)}).
|
||||
* If you choose not to attach one, this operation is similar to the stateless {@link #foreach(ForeachAction)}
|
||||
* but allows access to the {@code ProcessorContext} and record metadata.
|
||||
* This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
|
||||
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
|
||||
* can be observed and additional periodic actions can be performed.
|
||||
* Note that this is a terminal operation that returns void.
|
||||
* <p>
|
||||
* In order for the processor to use state stores, the stores must be added to the topology and connected to the
|
||||
* processor using at least one of two strategies (though it's not required to connect global state stores; read-only
|
||||
* access to global state stores is available by default).
|
||||
* <p>
|
||||
* The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
|
||||
* and specify the store names via {@code stateStoreNames} so they will be connected to the processor.
|
||||
* <pre>{@code
|
||||
* // create store
|
||||
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
|
||||
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
|
||||
* Serdes.String(),
|
||||
* Serdes.String());
|
||||
* // add store
|
||||
* builder.addStateStore(keyValueStoreBuilder);
|
||||
*
|
||||
* KStream outputStream = inputStream.processor(new ProcessorSupplier() {
|
||||
* public Processor get() {
|
||||
* return new MyProcessor();
|
||||
* }
|
||||
* }, "myProcessorState");
|
||||
* }</pre>
|
||||
* The second strategy is for the given {@link org.apache.kafka.streams.processor.ProcessorSupplier}
|
||||
* to implement {@link ConnectedStoreProvider#stores()},
|
||||
* which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the processor.
|
||||
* <pre>{@code
|
||||
* class MyProcessorSupplier implements ProcessorSupplier {
|
||||
* // supply processor
|
||||
* Processor get() {
|
||||
* return new MyProcessor();
|
||||
* }
|
||||
*
|
||||
* // provide store(s) that will be added and connected to the associated processor
|
||||
* // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext
|
||||
* Set<StoreBuilder> stores() {
|
||||
* StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
|
||||
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
|
||||
* Serdes.String(),
|
||||
* Serdes.String());
|
||||
* return Collections.singleton(keyValueStoreBuilder);
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* ...
|
||||
*
|
||||
* KStream outputStream = inputStream.process(new MyProcessorSupplier());
|
||||
* }</pre>
|
||||
* <p>
|
||||
* With either strategy, within the {@link org.apache.kafka.streams.processor.Processor},
|
||||
* the state is obtained via the {@link org.apache.kafka.streams.processor.ProcessorContext}.
|
||||
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
|
||||
* a schedule must be registered.
|
||||
* <pre>{@code
|
||||
* class MyProcessor implements Processor {
|
||||
* private StateStore state;
|
||||
*
|
||||
* void init(ProcessorContext context) {
|
||||
* this.state = context.getStateStore("myProcessorState");
|
||||
* // punctuate each second, can access this.state
|
||||
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
|
||||
* }
|
||||
*
|
||||
* void process(K key, V value) {
|
||||
* // can access this.state
|
||||
* }
|
||||
*
|
||||
* void close() {
|
||||
* // can access this.state
|
||||
* }
|
||||
* }
|
||||
* }</pre>
|
||||
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
|
||||
* If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}.
|
||||
*
|
||||
* @param processorSupplier an instance of {@link org.apache.kafka.streams.processor.ProcessorSupplier}
|
||||
* that generates a newly constructed {@link org.apache.kafka.streams.processor.Processor}
|
||||
* The supplier should always generate a new instance. Creating a single
|
||||
* {@link org.apache.kafka.streams.processor.Processor} object
|
||||
* and returning the same object reference in
|
||||
* {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} is a
|
||||
* violation of the supplier pattern and leads to runtime exceptions.
|
||||
* @param stateStoreNames the names of the state stores used by the processor; not required if the supplier
|
||||
* implements {@link ConnectedStoreProvider#stores()}
|
||||
* @see #foreach(ForeachAction)
|
||||
* @deprecated Since 3.0. Use {@link KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, java.lang.String...)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
void process(final org.apache.kafka.streams.processor.ProcessorSupplier<? super K, ? super V> processorSupplier,
|
||||
final String... stateStoreNames);
|
||||
|
||||
/**
|
||||
* Process all records in this stream, one record at a time, by applying a
|
||||
* {@link org.apache.kafka.streams.processor.Processor} (provided by the given
|
||||
* {@link org.apache.kafka.streams.processor.ProcessorSupplier}).
|
||||
* Attaching a state store makes this a stateful record-by-record operation (cf. {@link #foreach(ForeachAction)}).
|
||||
* If you choose not to attach one, this operation is similar to the stateless {@link #foreach(ForeachAction)}
|
||||
* but allows access to the {@code ProcessorContext} and record metadata.
|
||||
* This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
|
||||
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
|
||||
* can be observed and additional periodic actions can be performed.
|
||||
* Note that this is a terminal operation that returns void.
|
||||
* <p>
|
||||
* In order for the processor to use state stores, the stores must be added to the topology and connected to the
|
||||
* processor using at least one of two strategies (though it's not required to connect global state stores; read-only
|
||||
* access to global state stores is available by default).
|
||||
* <p>
|
||||
* The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
|
||||
* and specify the store names via {@code stateStoreNames} so they will be connected to the processor.
|
||||
* <pre>{@code
|
||||
* // create store
|
||||
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
|
||||
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
|
||||
* Serdes.String(),
|
||||
* Serdes.String());
|
||||
* // add store
|
||||
* builder.addStateStore(keyValueStoreBuilder);
|
||||
*
|
||||
* KStream outputStream = inputStream.processor(new ProcessorSupplier() {
|
||||
* public Processor get() {
|
||||
* return new MyProcessor();
|
||||
* }
|
||||
* }, "myProcessorState");
|
||||
* }</pre>
|
||||
* The second strategy is for the given {@link org.apache.kafka.streams.processor.ProcessorSupplier}
|
||||
* to implement {@link ConnectedStoreProvider#stores()},
|
||||
* which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the processor.
|
||||
* <pre>{@code
|
||||
* class MyProcessorSupplier implements ProcessorSupplier {
|
||||
* // supply processor
|
||||
* Processor get() {
|
||||
* return new MyProcessor();
|
||||
* }
|
||||
*
|
||||
* // provide store(s) that will be added and connected to the associated processor
|
||||
* // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext
|
||||
* Set<StoreBuilder> stores() {
|
||||
* StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
|
||||
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
|
||||
* Serdes.String(),
|
||||
* Serdes.String());
|
||||
* return Collections.singleton(keyValueStoreBuilder);
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* ...
|
||||
*
|
||||
* KStream outputStream = inputStream.process(new MyProcessorSupplier());
|
||||
* }</pre>
|
||||
* <p>
|
||||
* With either strategy, within the {@link org.apache.kafka.streams.processor.Processor},
|
||||
* the state is obtained via the {@link org.apache.kafka.streams.processor.ProcessorContext}.
|
||||
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
|
||||
* a schedule must be registered.
|
||||
* <pre>{@code
|
||||
* class MyProcessor implements Processor {
|
||||
* private StateStore state;
|
||||
*
|
||||
* void init(ProcessorContext context) {
|
||||
* this.state = context.getStateStore("myProcessorState");
|
||||
* // punctuate each second, can access this.state
|
||||
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
|
||||
* }
|
||||
*
|
||||
* void process(K key, V value) {
|
||||
* // can access this.state
|
||||
* }
|
||||
*
|
||||
* void close() {
|
||||
* // can access this.state
|
||||
* }
|
||||
* }
|
||||
* }</pre>
|
||||
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
|
||||
* If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}.
|
||||
*
|
||||
* @param processorSupplier an instance of {@link org.apache.kafka.streams.processor.ProcessorSupplier}
|
||||
* that generates a newly constructed {@link org.apache.kafka.streams.processor.Processor}
|
||||
* The supplier should always generate a new instance. Creating a single
|
||||
* {@link org.apache.kafka.streams.processor.Processor} object
|
||||
* and returning the same object reference in
|
||||
* {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} is a
|
||||
* violation of the supplier pattern and leads to runtime exceptions.
|
||||
* @param named a {@link Named} config used to name the processor in the topology
|
||||
* @param stateStoreNames the names of the state store used by the processor
|
||||
* @see #foreach(ForeachAction)
|
||||
* @deprecated Since 3.0. Use {@link KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, org.apache.kafka.streams.kstream.Named, java.lang.String...)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
void process(final org.apache.kafka.streams.processor.ProcessorSupplier<? super K, ? super V> processorSupplier,
|
||||
final Named named,
|
||||
final String... stateStoreNames);
|
||||
|
||||
/**
|
||||
* Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given
|
||||
* {@link ProcessorSupplier}).
|
||||
|
|
|
@ -1199,35 +1199,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
|
|||
builder);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public void process(final org.apache.kafka.streams.processor.ProcessorSupplier<? super K, ? super V> processorSupplier,
|
||||
final String... stateStoreNames) {
|
||||
process(processorSupplier, Named.as(builder.newProcessorName(PROCESSOR_NAME)), stateStoreNames);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public void process(final org.apache.kafka.streams.processor.ProcessorSupplier<? super K, ? super V> processorSupplier,
|
||||
final Named named,
|
||||
final String... stateStoreNames) {
|
||||
Objects.requireNonNull(processorSupplier, "processorSupplier can't be null");
|
||||
Objects.requireNonNull(named, "named can't be null");
|
||||
Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
|
||||
ApiUtils.checkSupplier(processorSupplier);
|
||||
for (final String stateStoreName : stateStoreNames) {
|
||||
Objects.requireNonNull(stateStoreName, "stateStoreNames can't be null");
|
||||
}
|
||||
|
||||
final String name = new NamedInternal(named).name();
|
||||
final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>(
|
||||
name,
|
||||
new ProcessorParameters<>(processorSupplier, name),
|
||||
stateStoreNames);
|
||||
|
||||
builder.addGraphNode(graphNode, processNode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <KOut, VOut> KStream<KOut, VOut> process(
|
||||
final ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier,
|
||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.streams.internals.ApiUtils;
|
|||
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
|
||||
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
|
||||
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
|
||||
import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
|
||||
import org.apache.kafka.streams.state.StoreBuilder;
|
||||
|
||||
import java.util.Set;
|
||||
|
@ -38,26 +37,12 @@ import java.util.Set;
|
|||
*/
|
||||
public class ProcessorParameters<KIn, VIn, KOut, VOut> {
|
||||
|
||||
// During the transition to KIP-478, we capture arguments passed from the old API to simplify
|
||||
// the performance of casts that we still need to perform. This will eventually be removed.
|
||||
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
|
||||
private final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> oldProcessorSupplier;
|
||||
private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier;
|
||||
private final FixedKeyProcessorSupplier<KIn, VIn, VOut> fixedKeyProcessorSupplier;
|
||||
private final String processorName;
|
||||
|
||||
@SuppressWarnings("deprecation") // Old PAPI compatibility.
|
||||
public ProcessorParameters(final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> processorSupplier,
|
||||
final String processorName) {
|
||||
oldProcessorSupplier = processorSupplier;
|
||||
this.processorSupplier = () -> ProcessorAdapter.adapt(processorSupplier.get());
|
||||
fixedKeyProcessorSupplier = null;
|
||||
this.processorName = processorName;
|
||||
}
|
||||
|
||||
public ProcessorParameters(final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier,
|
||||
final String processorName) {
|
||||
oldProcessorSupplier = null;
|
||||
this.processorSupplier = processorSupplier;
|
||||
fixedKeyProcessorSupplier = null;
|
||||
this.processorName = processorName;
|
||||
|
@ -65,7 +50,6 @@ public class ProcessorParameters<KIn, VIn, KOut, VOut> {
|
|||
|
||||
public ProcessorParameters(final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier,
|
||||
final String processorName) {
|
||||
oldProcessorSupplier = null;
|
||||
this.processorSupplier = null;
|
||||
fixedKeyProcessorSupplier = processorSupplier;
|
||||
this.processorName = processorName;
|
||||
|
@ -109,14 +93,6 @@ public class ProcessorParameters<KIn, VIn, KOut, VOut> {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// temporary hack until KIP-478 is fully implemented
|
||||
// Old PAPI. Needs to be migrated.
|
||||
if (oldProcessorSupplier != null && oldProcessorSupplier.stores() != null) {
|
||||
for (final StoreBuilder<?> storeBuilder : oldProcessorSupplier.stores()) {
|
||||
topologyBuilder.addStateStore(storeBuilder, processorName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public String processorName() {
|
||||
|
|
|
@ -49,12 +49,12 @@ import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
|
|||
import org.apache.kafka.streams.kstream.ValueMapper;
|
||||
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
|
||||
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.TopicNameExtractor;
|
||||
import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor;
|
||||
import org.apache.kafka.streams.processor.api.ContextualProcessor;
|
||||
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
|
||||
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
|
||||
import org.apache.kafka.streams.processor.api.Processor;
|
||||
import org.apache.kafka.streams.processor.api.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
|
||||
import org.apache.kafka.streams.processor.api.Record;
|
||||
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
|
||||
|
@ -1810,9 +1810,8 @@ public class KStreamImplTest {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void shouldProcessWithOldProcessorAndState() {
|
||||
public void shouldProcessWithProcessorAndState() {
|
||||
final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
|
||||
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
@ -1826,16 +1825,19 @@ public class KStreamImplTest {
|
|||
));
|
||||
|
||||
builder.stream(input, consumed)
|
||||
.process(() -> new org.apache.kafka.streams.processor.Processor<String, String>() {
|
||||
.process(() -> new Processor<String, String, String, String>() {
|
||||
private KeyValueStore<String, Integer> sumStore;
|
||||
|
||||
@Override
|
||||
public void init(final ProcessorContext context) {
|
||||
public void init(final ProcessorContext<String, String> context) {
|
||||
this.sumStore = context.getStateStore("sum");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final String key, final String value) {
|
||||
public void process(final Record<String, String> record) {
|
||||
final String key = record.key();
|
||||
final String value = record.value();
|
||||
|
||||
final Integer counter = sumStore.get(key);
|
||||
if (counter == null) {
|
||||
sumStore.putIfAbsent(key, value.length());
|
||||
|
@ -1847,10 +1849,6 @@ public class KStreamImplTest {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
}, Named.as("p"), "sum");
|
||||
|
||||
final String topologyDescription = builder.build().describe().toString();
|
||||
|
@ -1889,9 +1887,8 @@ public class KStreamImplTest {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void shouldBindStateWithOldProcessorSupplier() {
|
||||
public void shouldBindStateWithProcessorSupplier() {
|
||||
final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
|
||||
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
@ -1899,20 +1896,23 @@ public class KStreamImplTest {
|
|||
final String input = "input";
|
||||
|
||||
builder.stream(input, consumed)
|
||||
.process(new org.apache.kafka.streams.processor.ProcessorSupplier<String, String>() {
|
||||
.process(new ProcessorSupplier<String, String, String, String>() {
|
||||
|
||||
@Override
|
||||
public org.apache.kafka.streams.processor.Processor<String, String> get() {
|
||||
return new org.apache.kafka.streams.processor.Processor<String, String>() {
|
||||
public Processor<String, String, String, String> get() {
|
||||
return new Processor<>() {
|
||||
private KeyValueStore<String, Integer> sumStore;
|
||||
|
||||
@Override
|
||||
public void init(final ProcessorContext context) {
|
||||
public void init(final ProcessorContext<String, String> context) {
|
||||
this.sumStore = context.getStateStore("sum");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final String key, final String value) {
|
||||
public void process(final Record<String, String> record) {
|
||||
final String key = record.key();
|
||||
final String value = record.value();
|
||||
|
||||
final Integer counter = sumStore.get(key);
|
||||
if (counter == null) {
|
||||
sumStore.putIfAbsent(key, value.length());
|
||||
|
@ -1924,14 +1924,9 @@ public class KStreamImplTest {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Set<StoreBuilder<?>> stores() {
|
||||
final Set<StoreBuilder<?>> stores = new HashSet<>();
|
||||
|
@ -1980,72 +1975,6 @@ public class KStreamImplTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldBindStateWithOldProcessor() {
|
||||
final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
|
||||
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
final String input = "input";
|
||||
final String output = "output";
|
||||
|
||||
builder.stream(input, consumed)
|
||||
.process(() -> new ContextualProcessor<String, String, String, Integer>() {
|
||||
@Override
|
||||
public void process(final Record<String, String> record) {
|
||||
context().forward(record.withValue(record.value().length()));
|
||||
}
|
||||
}, Named.as("p"))
|
||||
.to(output, Produced.valueSerde(Serdes.Integer()));
|
||||
|
||||
final String topologyDescription = builder.build().describe().toString();
|
||||
|
||||
assertThat(
|
||||
topologyDescription,
|
||||
equalTo("Topologies:\n" +
|
||||
" Sub-topology: 0\n" +
|
||||
" Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" +
|
||||
" --> p\n" +
|
||||
" Processor: p (stores: [])\n" +
|
||||
" --> KSTREAM-SINK-0000000001\n" +
|
||||
" <-- KSTREAM-SOURCE-0000000000\n" +
|
||||
" Sink: KSTREAM-SINK-0000000001 (topic: output)\n" +
|
||||
" <-- p\n\n")
|
||||
);
|
||||
|
||||
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
|
||||
final TestInputTopic<String, String> inputTopic =
|
||||
driver.createInputTopic(
|
||||
input,
|
||||
new StringSerializer(),
|
||||
new StringSerializer()
|
||||
);
|
||||
final TestOutputTopic<String, Integer> outputTopic =
|
||||
driver.createOutputTopic(
|
||||
output,
|
||||
new StringDeserializer(),
|
||||
new IntegerDeserializer()
|
||||
);
|
||||
|
||||
inputTopic.pipeInput("A", "0", 5L);
|
||||
inputTopic.pipeInput("B", "00", 100L);
|
||||
inputTopic.pipeInput("C", "000", 0L);
|
||||
inputTopic.pipeInput("D", "0000", 0L);
|
||||
inputTopic.pipeInput("A", "00000", 10L);
|
||||
inputTopic.pipeInput("A", "000000", 8L);
|
||||
|
||||
final List<TestRecord<String, Integer>> outputExpectRecords = new ArrayList<>();
|
||||
outputExpectRecords.add(new TestRecord<>("A", 1, Instant.ofEpochMilli(5L)));
|
||||
outputExpectRecords.add(new TestRecord<>("B", 2, Instant.ofEpochMilli(100L)));
|
||||
outputExpectRecords.add(new TestRecord<>("C", 3, Instant.ofEpochMilli(0L)));
|
||||
outputExpectRecords.add(new TestRecord<>("D", 4, Instant.ofEpochMilli(0L)));
|
||||
outputExpectRecords.add(new TestRecord<>("A", 5, Instant.ofEpochMilli(10L)));
|
||||
outputExpectRecords.add(new TestRecord<>("A", 6, Instant.ofEpochMilli(8L)));
|
||||
|
||||
assertEquals(outputTopic.readRecordsToList(), outputExpectRecords);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldProcessValues() {
|
||||
final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
|
||||
|
|
|
@ -483,50 +483,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
|
|||
def toTable(named: Named, materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
|
||||
new KTable(inner.toTable(named, materialized))
|
||||
|
||||
/**
|
||||
* Process all records in this stream, one record at a time, by applying a `Processor` (provided by the given
|
||||
* `processorSupplier`).
|
||||
* In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
|
||||
* to the `Processor`.
|
||||
* It's not required to connect global state stores that are added via `addGlobalStore`;
|
||||
* read-only access to global state stores is available by default.
|
||||
*
|
||||
* @param processorSupplier a function that generates a `org.apache.kafka.streams.processor.Processor`
|
||||
* @param stateStoreNames the names of the state store used by the processor
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#process`
|
||||
*/
|
||||
@deprecated(since = "3.0", message = "Use process(ProcessorSupplier, String*) instead.")
|
||||
def process(
|
||||
processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V],
|
||||
stateStoreNames: String*
|
||||
): Unit = {
|
||||
val processorSupplierJ: org.apache.kafka.streams.processor.ProcessorSupplier[K, V] = () => processorSupplier()
|
||||
inner.process(processorSupplierJ, stateStoreNames: _*)
|
||||
}
|
||||
|
||||
/**
|
||||
* Process all records in this stream, one record at a time, by applying a `Processor` (provided by the given
|
||||
* `processorSupplier`).
|
||||
* In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
|
||||
* to the `Processor`.
|
||||
* It's not required to connect global state stores that are added via `addGlobalStore`;
|
||||
* read-only access to global state stores is available by default.
|
||||
*
|
||||
* @param processorSupplier a function that generates a `org.apache.kafka.streams.processor.Processor`
|
||||
* @param named a [[Named]] config used to name the processor in the topology
|
||||
* @param stateStoreNames the names of the state store used by the processor
|
||||
* @see `org.apache.kafka.streams.kstream.KStream#process`
|
||||
*/
|
||||
@deprecated(since = "3.0", message = "Use process(ProcessorSupplier, String*) instead.")
|
||||
def process(
|
||||
processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V],
|
||||
named: Named,
|
||||
stateStoreNames: String*
|
||||
): Unit = {
|
||||
val processorSupplierJ: org.apache.kafka.streams.processor.ProcessorSupplier[K, V] = () => processorSupplier()
|
||||
inner.process(processorSupplierJ, named, stateStoreNames: _*)
|
||||
}
|
||||
|
||||
/**
|
||||
* Process all records in this stream, one record at a time, by applying a `Processor` (provided by the given
|
||||
* `processorSupplier`).
|
||||
|
|
Loading…
Reference in New Issue