mirror of https://github.com/apache/kafka.git
MINOR: update streams quickstart for KIP-182
Author: Damian Guy <damian.guy@gmail.com> Reviewers: Michael G. Noll <michael@confluent.io>, Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com> Closes #3984 from dguy/quickstart-update
This commit is contained in:
parent
ac7695c32a
commit
93b71e7dee
|
@ -44,7 +44,8 @@ final Serde<Long> longSerde = Serdes.Long();
|
||||||
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
|
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
|
||||||
// represent lines of text (for the sake of this example, we ignore whatever may be stored
|
// represent lines of text (for the sake of this example, we ignore whatever may be stored
|
||||||
// in the message keys).
|
// in the message keys).
|
||||||
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-plaintext-input");
|
KStream<String, String> textLines = builder.stream("streams-plaintext-input",
|
||||||
|
Consumed.with(stringSerde, stringSerde);
|
||||||
|
|
||||||
KTable<String, Long> wordCounts = textLines
|
KTable<String, Long> wordCounts = textLines
|
||||||
// Split each text line, by whitespace, into words.
|
// Split each text line, by whitespace, into words.
|
||||||
|
@ -54,10 +55,10 @@ KTable<String, Long> wordCounts = textLines
|
||||||
.groupBy((key, value) -> value)
|
.groupBy((key, value) -> value)
|
||||||
|
|
||||||
// Count the occurrences of each word (message key).
|
// Count the occurrences of each word (message key).
|
||||||
.count("Counts")
|
.count()
|
||||||
|
|
||||||
// Store the running counts as a changelog stream to the output topic.
|
// Store the running counts as a changelog stream to the output topic.
|
||||||
wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");
|
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
|
||||||
</pre>
|
</pre>
|
||||||
|
|
||||||
<p>
|
<p>
|
||||||
|
|
|
@ -485,12 +485,14 @@
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.count("Counts");
|
// Materialize the result into a KeyValueStore named "counts-store".
|
||||||
|
// The Materialized store is always of type <Bytes, byte[]> as this is the format of the inner most store.
|
||||||
|
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));
|
||||||
</pre>
|
</pre>
|
||||||
|
|
||||||
<p>
|
<p>
|
||||||
Note that the <code>count</code> operator has a <code>String</code> typed parameter <code>Counts</code>,
|
Note that the <code>count</code> operator has a <code>Materialized</code> parameter that specifies that the
|
||||||
which stores the running counts that keep being updated as more records are piped and processed from the source Kafka topic.
|
running count should be stored in a state store named <code>counts-store</code>.
|
||||||
This <code>Counts</code> store can be queried in real-time, with details described in the <a href="/{{version}}/documentation/streams/developer-guide#streams_interactive_queries">Developer Manual</a>.
|
This <code>Counts</code> store can be queried in real-time, with details described in the <a href="/{{version}}/documentation/streams/developer-guide#streams_interactive_queries">Developer Manual</a>.
|
||||||
</p>
|
</p>
|
||||||
|
|
||||||
|
@ -502,7 +504,7 @@
|
||||||
</p>
|
</p>
|
||||||
|
|
||||||
<pre class="brush: java;">
|
<pre class="brush: java;">
|
||||||
counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
|
counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long());
|
||||||
</pre>
|
</pre>
|
||||||
|
|
||||||
<p>
|
<p>
|
||||||
|
@ -516,8 +518,9 @@
|
||||||
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
||||||
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
|
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
|
||||||
.groupBy((key, value) -> value)
|
.groupBy((key, value) -> value)
|
||||||
.count("Counts")
|
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
|
||||||
.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
|
.toStream()
|
||||||
|
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long());
|
||||||
</pre>
|
</pre>
|
||||||
|
|
||||||
<p>
|
<p>
|
||||||
|
@ -589,8 +592,9 @@
|
||||||
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
KStream<String, String> source = builder.stream("streams-plaintext-input");
|
||||||
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
|
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
|
||||||
.groupBy((key, value) -> value)
|
.groupBy((key, value) -> value)
|
||||||
.count("Counts")
|
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
|
||||||
.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
|
.toStream()
|
||||||
|
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long());
|
||||||
|
|
||||||
final Topology topology = builder.build();
|
final Topology topology = builder.build();
|
||||||
final KafkaStreams streams = new KafkaStreams(topology, props);
|
final KafkaStreams streams = new KafkaStreams(topology, props);
|
||||||
|
|
|
@ -17,12 +17,16 @@
|
||||||
package ${package};
|
package ${package};
|
||||||
|
|
||||||
import org.apache.kafka.common.serialization.Serdes;
|
import org.apache.kafka.common.serialization.Serdes;
|
||||||
|
import org.apache.kafka.common.utils.Bytes;
|
||||||
import org.apache.kafka.streams.KafkaStreams;
|
import org.apache.kafka.streams.KafkaStreams;
|
||||||
import org.apache.kafka.streams.StreamsBuilder;
|
import org.apache.kafka.streams.StreamsBuilder;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
import org.apache.kafka.streams.StreamsConfig;
|
||||||
import org.apache.kafka.streams.Topology;
|
import org.apache.kafka.streams.Topology;
|
||||||
import org.apache.kafka.streams.kstream.KeyValueMapper;
|
import org.apache.kafka.streams.kstream.KeyValueMapper;
|
||||||
|
import org.apache.kafka.streams.kstream.Materialized;
|
||||||
|
import org.apache.kafka.streams.kstream.Produced;
|
||||||
import org.apache.kafka.streams.kstream.ValueMapper;
|
import org.apache.kafka.streams.kstream.ValueMapper;
|
||||||
|
import org.apache.kafka.streams.state.KeyValueStore;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
@ -59,17 +63,19 @@ public class WordCount {
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.count("Counts")
|
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
|
||||||
.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
|
.toStream()
|
||||||
|
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
|
||||||
|
|
||||||
|
|
||||||
/* ------- use the code below for Java 8 and uncomment the above ----
|
/* ------- use the code below for Java 8 and comment the above ----
|
||||||
|
|
||||||
builder.stream("streams-plaintext-input")
|
builder.<String, String>stream("streams-plaintext-input")
|
||||||
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
|
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
|
||||||
.groupBy((key, value) -> value)
|
.groupBy((key, value) -> value)
|
||||||
.count("Counts")
|
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
|
||||||
.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
|
.toStream()
|
||||||
|
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
|
||||||
|
|
||||||
----------------------------------------------------------------- */
|
----------------------------------------------------------------- */
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue