MINOR: update web docs and examples of Streams with Java8 syntax (#5249)

Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Damian Guy <damian@confluent.io>
This commit is contained in:
Guozhang Wang 2018-06-21 10:02:58 -07:00 committed by GitHub
parent 456b17fc8e
commit d3e264e773
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 80 additions and 187 deletions

View File

@ -898,9 +898,9 @@
<span class="c1">// Aggregating with time-based windowing (here: with 5-minute tumbling windows)</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="mi">5</span><span class="o">))</span>
<span class="o">.</span><span class="na">aggregate</span><span class="o">(</span>
<span class="o">()</span> <span class="o">-&gt;</span> <span class="mi">0</span><span class="n">L</span><span class="o">,</span> <span class="cm">/* initializer */</span>
<span class="o">(</span><span class="n">aggKey</span><span class="o">,</span> <span class="n">newValue</span><span class="o">,</span> <span class="n">aggValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span><span class="o">,</span> <span class="cm">/* adder */</span>
<span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">WindowStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;time-windowed-aggregated-stream-store&quot;</span><span class="o">)</span> <span class="cm">/* state store name */</span>
<span class="o">()</span> <span class="o">-&gt;</span> <span class="mi">0</span><span class="n">L</span><span class="o">,</span> <span class="cm">/* initializer */</span>
<span class="o">(</span><span class="n">aggKey</span><span class="o">,</span> <span class="n">newValue</span><span class="o">,</span> <span class="n">aggValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span><span class="o">,</span> <span class="cm">/* adder */</span>
<span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">WindowStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;time-windowed-aggregated-stream-store&quot;</span><span class="o">)</span> <span class="cm">/* state store name */</span>
<span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span> <span class="cm">/* serde for aggregate value */</span>
<span class="c1">// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)</span>
@ -908,8 +908,8 @@
<span class="n">aggregate</span><span class="o">(</span>
<span class="o">()</span> <span class="o">-&gt;</span> <span class="mi">0</span><span class="n">L</span><span class="o">,</span> <span class="cm">/* initializer */</span>
<span class="o">(</span><span class="n">aggKey</span><span class="o">,</span> <span class="n">newValue</span><span class="o">,</span> <span class="n">aggValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span><span class="o">,</span> <span class="cm">/* adder */</span>
<span class="o">(</span><span class="n">aggKey</span><span class="o">,</span> <span class="n">leftAggValue</span><span class="o">,</span> <span class="n">rightAggValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">leftAggValue</span> <span class="o">+</span> <span class="n">rightAggValue</span><span class="o">,</span> <span class="cm">/* session merger */</span>
<span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">SessionStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;sessionized-aggregated-stream-store&quot;</span><span class="o">)</span> <span class="cm">/* state store name */</span>
<span class="o">(</span><span class="n">aggKey</span><span class="o">,</span> <span class="n">leftAggValue</span><span class="o">,</span> <span class="n">rightAggValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">leftAggValue</span> <span class="o">+</span> <span class="n">rightAggValue</span><span class="o">,</span> <span class="cm">/* session merger */</span>
<span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">SessionStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;sessionized-aggregated-stream-store&quot;</span><span class="o">)</span> <span class="cm">/* state store name */</span>
<span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span> <span class="cm">/* serde for aggregate value */</span>
<span class="c1">// Java 7 examples</span>

View File

@ -255,18 +255,8 @@ public class CustomMaxAggregator implements Processor&lt;String, Long&gt; {
@Override
public void init(ProcessorContext context) {
this.context = context;
context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
@Override
public void punctuate(long timestamp) {
flushStore();
}
});
context.schedule(10000, PunctuationType.STREAM_TIME, new Punctuator() {
@Override
public void punctuate(long timestamp) {
flushStore();
}
});
context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, time -&gt; flushStore());
context.schedule(10000, PunctuationType.STREAM_TIME, time -&gt; flushStore());
store = (KeyValueStore&lt;String, Long&gt;) context.getStateStore("aggStore");
}
@ -286,9 +276,6 @@ public class CustomMaxAggregator implements Processor&lt;String, Long&gt; {
}
}
@Override
public void punctuate(long timestamp) {} // deprecated; not used
@Override
public void close() {}
}
@ -407,12 +394,8 @@ punctuator.punctuate(/*timestamp*/ 0L);
</div>
</div>
<div class="pagination">
<div class="pagination">
<a href="/{{version}}/documentation/streams/developer-guide/datatypes"
class="pagination__btn pagination__btn__prev">Previous</a>
<a href="/{{version}}/documentation/streams/developer-guide/interactive-queries"
class="pagination__btn pagination__btn__next">Next</a>
</div>
<a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__prev">Previous</a>
<a href="/{{version}}/documentation/streams/developer-guide/interactive-queries" class="pagination__btn pagination__btn__next">Next</a>
</div>
</script>

View File

@ -28,16 +28,14 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
@ -83,7 +81,7 @@ public class PageViewTypedDemo {
public String region;
}
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@ -151,56 +149,56 @@ public class PageViewTypedDemo {
Consumed.with(Serdes.String(), userProfileSerde));
KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
.leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
@Override
public PageViewByRegion apply(PageView view, UserProfile profile) {
PageViewByRegion viewByRegion = new PageViewByRegion();
viewByRegion.user = view.user;
viewByRegion.page = view.page;
.leftJoin(users, (view, profile) -> {
PageViewByRegion viewByRegion = new PageViewByRegion();
viewByRegion.user = view.user;
viewByRegion.page = view.page;
if (profile != null) {
viewByRegion.region = profile.region;
} else {
viewByRegion.region = "UNKNOWN";
}
return viewByRegion;
}
})
.map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() {
@Override
public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) {
return new KeyValue<>(viewRegion.region, viewRegion);
if (profile != null) {
viewByRegion.region = profile.region;
} else {
viewByRegion.region = "UNKNOWN";
}
return viewByRegion;
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
.groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde))
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(7)).advanceBy(TimeUnit.SECONDS.toMillis(1)))
.count()
.toStream()
.map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
@Override
public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
wViewByRegion.windowStart = key.window().start();
wViewByRegion.region = key.key();
.map((key, value) -> {
WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
wViewByRegion.windowStart = key.window().start();
wViewByRegion.region = key.key();
RegionCount rCount = new RegionCount();
rCount.region = key.key();
rCount.count = value;
RegionCount rCount = new RegionCount();
rCount.region = key.key();
rCount.count = value;
return new KeyValue<>(wViewByRegion, rCount);
}
return new KeyValue<>(wViewByRegion, rCount);
});
// write to the result topic
regionCount.to("streams-pageviewstats-typed-output", Produced.with(wPageViewByRegionSerde, regionCountSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
final CountDownLatch latch = new CountDownLatch(1);
// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the input data is finite.
Thread.sleep(5000L);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-pipe-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
streams.close();
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}

View File

@ -33,13 +33,9 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
import java.util.Properties;
@ -79,46 +75,30 @@ public class PageViewUntypedDemo {
KTable<String, JsonNode> users = builder.table("streams-userprofile-input", consumed);
KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() {
@Override
public String apply(JsonNode record) {
return record.get("region").textValue();
}
});
KTable<String, String> userRegions = users.mapValues(record -> record.get("region").textValue());
KStream<JsonNode, JsonNode> regionCount = views
.leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() {
@Override
public JsonNode apply(JsonNode view, String region) {
ObjectNode jNode = JsonNodeFactory.instance.objectNode();
.leftJoin(userRegions, (view, region) -> {
ObjectNode jNode = JsonNodeFactory.instance.objectNode();
return (JsonNode) jNode.put("user", view.get("user").textValue())
.put("page", view.get("page").textValue())
.put("region", region == null ? "UNKNOWN" : region);
return jNode.put("user", view.get("user").textValue())
.put("page", view.get("page").textValue())
.put("region", region == null ? "UNKNOWN" : region);
}
})
.map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() {
@Override
public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
}
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.windowedBy(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000))
.count()
.toStream()
.map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
@Override
public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) {
ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
keyNode.put("window-start", key.window().start())
.put("region", key.key());
.map((key, value) -> {
ObjectNode keyNode = JsonNodeFactory.instance.objectNode();
keyNode.put("window-start", key.window().start())
.put("region", key.key());
ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
valueNode.put("count", value);
ObjectNode valueNode = JsonNodeFactory.instance.objectNode();
valueNode.put("count", value);
return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
}
return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode);
});
// write to the result topic

View File

@ -38,7 +38,7 @@ import java.util.concurrent.CountDownLatch;
*/
public class PipeDemo {
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

View File

@ -23,10 +23,7 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
@ -71,7 +68,7 @@ public class TemperatureDemo {
// window size within which the filtering is applied
private static final int TEMPERATURE_WINDOW_SIZE = 5;
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temperature");
@ -89,30 +86,17 @@ public class TemperatureDemo {
KStream<Windowed<String>, String> max = source
// temperature values are sent without a key (null), so in order
// to group and reduce them, a key is needed ("temp" has been chosen)
.selectKey(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String key, String value) {
return "temp";
}
})
.selectKey((key, value) -> "temp")
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
.reduce(new Reducer<String>() {
@Override
public String apply(String value1, String value2) {
if (Integer.parseInt(value1) > Integer.parseInt(value2))
return value1;
else
return value2;
}
.reduce((value1, value2) -> {
if (Integer.parseInt(value1) > Integer.parseInt(value2))
return value1;
else
return value2;
})
.toStream()
.filter(new Predicate<Windowed<String>, String>() {
@Override
public boolean test(Windowed<String> key, String value) {
return Integer.parseInt(value) > TEMPERATURE_THRESHOLD;
}
});
.filter((key, value) -> Integer.parseInt(value) > TEMPERATURE_THRESHOLD);
Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);

View File

@ -23,9 +23,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.util.Arrays;
import java.util.Locale;
@ -46,7 +44,7 @@ import java.util.concurrent.CountDownLatch;
*/
public class WordCountDemo {
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@ -64,18 +62,8 @@ public class WordCountDemo {
KStream<String, String> source = builder.stream("streams-plaintext-input");
KTable<String, Long> counts = source
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
}
})
.groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String key, String value) {
return value;
}
})
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
.groupBy((key, value) -> value)
.count();
// need to override value serde to Long type

View File

@ -26,7 +26,6 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
@ -61,19 +60,16 @@ public class WordCountProcessorDemo {
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
this.context = context;
this.context.schedule(1000, PunctuationType.STREAM_TIME, new Punctuator() {
@Override
public void punctuate(long timestamp) {
try (KeyValueIterator<String, Integer> iter = kvStore.all()) {
System.out.println("----------- " + timestamp + " ----------- ");
this.context.schedule(1000, PunctuationType.STREAM_TIME, timestamp -> {
try (KeyValueIterator<String, Integer> iter = kvStore.all()) {
System.out.println("----------- " + timestamp + " ----------- ");
while (iter.hasNext()) {
KeyValue<String, Integer> entry = iter.next();
while (iter.hasNext()) {
KeyValue<String, Integer> entry = iter.next();
System.out.println("[" + entry.key + ", " + entry.value + "]");
System.out.println("[" + entry.key + ", " + entry.value + "]");
context.forward(entry.key, entry.value.toString());
}
context.forward(entry.key, entry.value.toString());
}
}
});
@ -103,7 +99,7 @@ public class WordCountProcessorDemo {
}
}
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@ -123,7 +119,7 @@ public class WordCountProcessorDemo {
Stores.inMemoryKeyValueStore("Counts"),
Serdes.String(),
Serdes.Integer()),
"Process");
"Process");
builder.addSink("Sink", "streams-wordcount-processor-output", "Process");

View File

@ -44,24 +44,10 @@ public class LineSplit {
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("streams-plaintext-input")
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split("\\W+"));
}
})
.to("streams-linesplit-output");
/* ------- use the code below for Java 8 and uncomment the above ----
builder.stream("streams-plaintext-input")
.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.to("streams-linesplit-output");
----------------------------------------------------------------- */
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);

View File

@ -50,26 +50,6 @@ public class WordCount {
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("streams-plaintext-input")
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
}
})
.groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String key, String value) {
return value;
}
})
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
.toStream()
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
/* ------- use the code below for Java 8 and comment the above ----
builder.<String, String>stream("streams-plaintext-input")
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
@ -77,8 +57,6 @@ public class WordCount {
.toStream()
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
----------------------------------------------------------------- */
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);