diff --git a/docs/streams/index.html b/docs/streams/index.html index 904338e5c6c..fe8504d77ff 100644 --- a/docs/streams/index.html +++ b/docs/streams/index.html @@ -153,26 +153,28 @@
import org.apache.kafka.common.serialization.Serdes; + import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; - import org.apache.kafka.streams.Topology; + import org.apache.kafka.streams.kstream.KStream; + import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueStore; - + import java.util.Arrays; import java.util.Properties; - + public class WordCountApplication { - + public static void main(final String[] args) throws Exception { Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - + StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream("TextLinesTopic"); KTable<String, Long> wordCounts = textLines @@ -180,11 +182,11 @@ .groupBy((key, word) -> word) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")); wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long())); - + KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.start(); } - + }
import org.apache.kafka.common.serialization.Serdes; + import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; - import org.apache.kafka.streams.Topology; + import org.apache.kafka.streams.kstream.KStream; + import org.apache.kafka.streams.kstream.KTable; + import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; - import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Arrays; @@ -247,16 +251,17 @@ import java.lang.Long import java.util.Properties import java.util.concurrent.TimeUnit - + import org.apache.kafka.common.serialization._ + import org.apache.kafka.common.utils.Bytes import org.apache.kafka.streams._ - import org.apache.kafka.streams.kstream.{KeyValueMapper, Materialized, Produced, ValueMapper} - import org.apache.kafka.streams.state.KeyValueStore; - + import org.apache.kafka.streams.kstream.{KStream, KTable, Materialized, Produced} + import org.apache.kafka.streams.state.KeyValueStore + import scala.collection.JavaConverters.asJavaIterableConverter - + object WordCountApplication { - + def main(args: Array[String]) { val config: Properties = { val p = new Properties() @@ -266,23 +271,23 @@ p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass) p } - + val builder: StreamsBuilder = new StreamsBuilder() val textLines: KStream[String, String] = builder.stream("TextLinesTopic") val wordCounts: KTable[String, Long] = textLines .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava) .groupBy((_, word) => word) .count(Materialized.as("counts-store").asInstanceOf[Materialized[String, Long, KeyValueStore[Bytes, Array[Byte]]]]) - wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long())) - + wordCounts.toStream().to("WordsWithCountsTopic", Produced.`with`(Serdes.String(), Serdes.Long())) + val streams: KafkaStreams = new KafkaStreams(builder.build(), config) streams.start() - + Runtime.getRuntime.addShutdownHook(new Thread(() => { streams.close(10, TimeUnit.SECONDS) })) } - + }