KAFKA-13821: Update Kafka Streams WordCount demo to new Processor API (#12139)

https://issues.apache.org/jira/browse/KAFKA-13821

Reviewers: Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>, Bill Bejeck <bbejeck@apache.org>
This commit is contained in:
CHUN-HAO TANG 2022-06-29 09:39:32 +08:00 committed by GitHub
parent 025e47b833
commit 6ac7f4ea8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 21 deletions

View File

@ -25,8 +25,11 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ConnectedStoreProvider;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@ -63,15 +66,15 @@ import java.util.concurrent.CountDownLatch;
*/
public final class WordCountTransformerDemo {
static class MyTransformerSupplier implements TransformerSupplier<String, String, KeyValue<String, String>> {
static class MyProcessorSupplier implements ProcessorSupplier<String, String, String, String> {
@Override
public Transformer<String, String, KeyValue<String, String>> get() {
return new Transformer<String, String, KeyValue<String, String>>() {
public Processor<String, String, String, String> get() {
return new Processor<String, String, String, String>() {
private KeyValueStore<String, Integer> kvStore;
@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<String, String> context) {
context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
System.out.println("----------- " + timestamp + " ----------- ");
@ -80,8 +83,7 @@ public final class WordCountTransformerDemo {
final KeyValue<String, Integer> entry = iter.next();
System.out.println("[" + entry.key + ", " + entry.value + "]");
context.forward(entry.key, entry.value.toString());
context.forward(new Record<>(entry.key, entry.value.toString(), timestamp));
}
}
});
@ -89,8 +91,8 @@ public final class WordCountTransformerDemo {
}
@Override
public KeyValue<String, String> transform(final String dummy, final String line) {
final String[] words = line.toLowerCase(Locale.getDefault()).split("\\W+");
public void process(final Record<String, String> record) {
final String[] words = record.value().toLowerCase(Locale.getDefault()).split("\\W+");
for (final String word : words) {
final Integer oldValue = this.kvStore.get(word);
@ -101,8 +103,6 @@ public final class WordCountTransformerDemo {
this.kvStore.put(word, oldValue + 1);
}
}
return null;
}
@Override
@ -119,7 +119,6 @@ public final class WordCountTransformerDemo {
}
}
@SuppressWarnings("deprecation")
public static void main(final String[] args) throws IOException {
final Properties props = new Properties();
if (args != null && args.length > 0) {
@ -142,8 +141,8 @@ public final class WordCountTransformerDemo {
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("streams-plaintext-input")
.transform(new MyTransformerSupplier())
.to("streams-wordcount-processor-output");
.process(new MyProcessorSupplier())
.to("streams-wordcount-processor-output");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);

View File

@ -16,13 +16,13 @@
*/
package org.apache.kafka.streams.examples.wordcount;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.StoreBuilder;
import org.junit.jupiter.api.Test;
@ -44,7 +44,7 @@ public class WordCountTransformerTest {
final MockProcessorContext<String, String> context = new MockProcessorContext<>();
// Create and initialize the transformer under test; including its provided store
final WordCountTransformerDemo.MyTransformerSupplier supplier = new WordCountTransformerDemo.MyTransformerSupplier();
final WordCountTransformerDemo.MyProcessorSupplier supplier = new WordCountTransformerDemo.MyProcessorSupplier();
for (final StoreBuilder<?> storeBuilder : supplier.stores()) {
final StateStore store = storeBuilder
.withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
@ -53,16 +53,16 @@ public class WordCountTransformerTest {
store.init(context.getStateStoreContext(), store);
context.getStateStoreContext().register(store, null);
}
final Transformer<String, String, KeyValue<String, String>> transformer = supplier.get();
transformer.init(new org.apache.kafka.streams.processor.MockProcessorContext() {
final Processor<String, String, String, String> processor = supplier.get();
processor.init(new org.apache.kafka.streams.processor.api.MockProcessorContext<String, String>() {
@Override
public <S extends StateStore> S getStateStore(final String name) {
return context.getStateStore(name);
}
@Override
public <K, V> void forward(final K key, final V value) {
context.forward(new Record<>((String) key, (String) value, 0L));
public <K extends String, V extends String> void forward(final Record<K, V> record) {
context.forward(record);
}
@Override
@ -72,7 +72,8 @@ public class WordCountTransformerTest {
});
// send a record to the transformer
transformer.transform("key", "alpha beta\tgamma\n\talpha");
final Record<String, String> record = new Record<>("key", "alpha beta\tgamma\n\talpha", 0L);
processor.process(record);
// note that the transformer does not forward during transform()
assertTrue(context.forwarded().isEmpty());