MINOR: Add missing imports to 'Hello Kafka Streams' examples (#4535)

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Daniel Wojda 2018-02-08 17:53:57 +00:00 committed by Guozhang Wang
parent ac267dc5ce
commit d8eddc6e16
1 changed files with 25 additions and 20 deletions

View File

@ -153,26 +153,28 @@
<div class="code-example__snippet b-java-8 selected"> <div class="code-example__snippet b-java-8 selected">
<pre class="brush: java;"> <pre class="brush: java;">
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays; import java.util.Arrays;
import java.util.Properties; import java.util.Properties;
public class WordCountApplication { public class WordCountApplication {
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
Properties config = new Properties(); Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder(); StreamsBuilder builder = new StreamsBuilder();
KStream&lt;String, String&gt; textLines = builder.stream("TextLinesTopic"); KStream&lt;String, String&gt; textLines = builder.stream("TextLinesTopic");
KTable&lt;String, Long&gt; wordCounts = textLines KTable&lt;String, Long&gt; wordCounts = textLines
@ -180,11 +182,11 @@
.groupBy((key, word) -> word) .groupBy((key, word) -> word)
.count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("counts-store")); .count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("counts-store"));
wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long())); wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), config); KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start(); streams.start();
} }
} }
</pre> </pre>
</div> </div>
@ -192,14 +194,16 @@
<div class="code-example__snippet b-java-7"> <div class="code-example__snippet b-java-7">
<pre class="brush: java;"> <pre class="brush: java;">
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays; import java.util.Arrays;
@ -247,16 +251,17 @@
import java.lang.Long import java.lang.Long
import java.util.Properties import java.util.Properties
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import org.apache.kafka.common.serialization._ import org.apache.kafka.common.serialization._
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams._ import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KeyValueMapper, Materialized, Produced, ValueMapper} import org.apache.kafka.streams.kstream.{KStream, KTable, Materialized, Produced}
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore
import scala.collection.JavaConverters.asJavaIterableConverter import scala.collection.JavaConverters.asJavaIterableConverter
object WordCountApplication { object WordCountApplication {
def main(args: Array[String]) { def main(args: Array[String]) {
val config: Properties = { val config: Properties = {
val p = new Properties() val p = new Properties()
@ -266,23 +271,23 @@
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass) p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p p
} }
val builder: StreamsBuilder = new StreamsBuilder() val builder: StreamsBuilder = new StreamsBuilder()
val textLines: KStream[String, String] = builder.stream("TextLinesTopic") val textLines: KStream[String, String] = builder.stream("TextLinesTopic")
val wordCounts: KTable[String, Long] = textLines val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava) .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
.groupBy((_, word) => word) .groupBy((_, word) => word)
.count(Materialized.as("counts-store").asInstanceOf[Materialized[String, Long, KeyValueStore[Bytes, Array[Byte]]]]) .count(Materialized.as("counts-store").asInstanceOf[Materialized[String, Long, KeyValueStore[Bytes, Array[Byte]]]])
wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long())) wordCounts.toStream().to("WordsWithCountsTopic", Produced.`with`(Serdes.String(), Serdes.Long()))
val streams: KafkaStreams = new KafkaStreams(builder.build(), config) val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.start() streams.start()
Runtime.getRuntime.addShutdownHook(new Thread(() => { Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams.close(10, TimeUnit.SECONDS) streams.close(10, TimeUnit.SECONDS)
})) }))
} }
} }
</pre> </pre>
</div> </div>