MINOR: Remove Java 7 example code (#16308)

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Jim Galasyn 2024-06-12 16:50:34 -07:00 committed by GitHub
parent cf5a86b654
commit c71d03bb69
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 30 additions and 452 deletions

View File

@ -383,8 +383,7 @@ Map&lt;String, KStream&lt;String, Long&gt;&gt; branches =
// KStream branches.get(&quot;Branch-A&quot;) contains all records whose keys start with &quot;A&quot;
// KStream branches.get(&quot;Branch-B&quot;) contains all records whose keys start with &quot;B&quot;
// KStream branches.get(&quot;Branch-C&quot;) contains all other records
// Java 7 example: cf. `filter` for how to create `Predicate` instances</code></pre>
</code></pre>
</td>
</tr>
<tr class="row-odd"><td><p class="first"><strong>Filter</strong></p>
@ -401,15 +400,7 @@ Map&lt;String, KStream&lt;String, Long&gt;&gt; branches =
// A filter that selects (keeps) only positive numbers
// Java 8+ example, using lambda expressions
KStream&lt;String, Long&gt; onlyPositives = stream.filter((key, value) -&gt; value &gt; 0);
// Java 7 example
KStream&lt;String, Long&gt; onlyPositives = stream.filter(
new Predicate&lt;String, Long&gt;() {
@Override
public boolean test(String key, Long value) {
return value &gt; 0;
}
});</code></pre>
</code></pre>
</td>
</tr>
<tr class="row-even"><td><p class="first"><strong>Inverse Filter</strong></p>
@ -427,14 +418,7 @@ KStream&lt;String, Long&gt; onlyPositives = stream.filter(
// Java 8+ example, using lambda expressions
KStream&lt;String, Long&gt; onlyPositives = stream.filterNot((key, value) -&gt; value &lt;= 0);
// Java 7 example
KStream&lt;String, Long&gt; onlyPositives = stream.filterNot(
new Predicate&lt;String, Long&gt;() {
@Override
public boolean test(String key, Long value) {
return value &lt;= 0;
}
});</code></pre>
</code></pre>
</td>
</tr>
<tr class="row-odd"><td><p class="first"><strong>FlatMap</strong></p>
@ -460,8 +444,7 @@ KStream&lt;String, Integer&gt; transformed = stream.flatMap(
return result;
}
);
// Java 7 example: cf. `map` for how to create `KeyValueMapper` instances</code></pre>
</code></pre>
</td>
</tr>
<tr class="row-even"><td><p class="first"><strong>FlatMapValues</strong></p>
@ -477,8 +460,7 @@ KStream&lt;String, Integer&gt; transformed = stream.flatMap(
<pre class="line-numbers"><code class="language-java">// Split a sentence into words.
KStream&lt;byte[], String&gt; sentences = ...;
KStream&lt;byte[], String&gt; words = sentences.flatMapValues(value -&gt; Arrays.asList(value.split(&quot;\\s+&quot;)));
// Java 7 example: cf. `mapValues` for how to create `ValueMapper` instances</code></pre>
</code></pre>
</td>
</tr>
<tr class="row-odd"><td><p class="first"><strong>Foreach</strong></p>
@ -499,15 +481,7 @@ KStream&lt;byte[], String&gt; words = sentences.flatMapValues(value -&gt; Arrays
// Print the contents of the KStream to the local console.
// Java 8+ example, using lambda expressions
stream.foreach((key, value) -&gt; System.out.println(key + &quot; =&gt; &quot; + value));
// Java 7 example
stream.foreach(
new ForeachAction&lt;String, Long&gt;() {
@Override
public void apply(String key, Long value) {
System.out.println(key + &quot; =&gt; &quot; + value);
}
});</code></pre>
</code></pre>
</td>
</tr>
<tr class="row-even"><td><p class="first"><strong>GroupByKey</strong></p>
@ -598,34 +572,7 @@ KGroupedTable&lt;String, Integer&gt; groupedTable = table.groupBy(
Serdes.Integer()) /* value (note: type was modified) */
);
// Java 7 examples
// Group the stream by a new key and key type
KGroupedStream&lt;String, String&gt; groupedStream = stream.groupBy(
new KeyValueMapper&lt;byte[], String, String&gt;&gt;() {
@Override
public String apply(byte[] key, String value) {
return value;
}
},
Grouped.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.String()) /* value */
);
// Group the table by a new key and key type, and also modify the value and value type.
KGroupedTable&lt;String, Integer&gt; groupedTable = table.groupBy(
new KeyValueMapper&lt;byte[], String, KeyValue&lt;String, Integer&gt;&gt;() {
@Override
public KeyValue&lt;String, Integer&gt; apply(byte[] key, String value) {
return KeyValue.pair(value, value.length());
}
},
Grouped.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.Integer()) /* value (note: type was modified) */
);</code></pre>
</code></pre>
</td>
</tr>
<tr class="row-even"><td><p class="first"><strong>Cogroup</strong></p>
@ -671,15 +618,7 @@ KTable&lt;byte[], String&gt; table2 = cogroupedStream.windowedBy(TimeWindows.ofS
// as well as the value and the value type.
KStream&lt;String, Integer&gt; transformed = stream.map(
(key, value) -&gt; KeyValue.pair(value.toLowerCase(), value.length()));
// Java 7 example
KStream&lt;String, Integer&gt; transformed = stream.map(
new KeyValueMapper&lt;byte[], String, KeyValue&lt;String, Integer&gt;&gt;() {
@Override
public KeyValue&lt;String, Integer&gt; apply(byte[] key, String value) {
return new KeyValue&lt;&gt;(value.toLowerCase(), value.length());
}
});</code></pre>
</code></pre>
</td>
</tr>
<tr class="row-even"><td><p class="first"><strong>Map (values only)</strong></p>
@ -698,15 +637,7 @@ KStream&lt;String, Integer&gt; transformed = stream.map(
// Java 8+ example, using lambda expressions
KStream&lt;byte[], String&gt; uppercased = stream.mapValues(value -&gt; value.toUpperCase());
// Java 7 example
KStream&lt;byte[], String&gt; uppercased = stream.mapValues(
new ValueMapper&lt;String&gt;() {
@Override
public String apply(String s) {
return s.toUpperCase();
}
});</code></pre>
</code></pre>
</td>
</tr>
<tr class="row-odd"><td><p class="first"><strong>Merge</strong></p>
@ -744,15 +675,7 @@ KStream&lt;byte[], String&gt; merged = stream1.merge(stream2);</code></pre>
// Java 8+ example, using lambda expressions
KStream&lt;byte[], String&gt; unmodifiedStream = stream.peek(
(key, value) -&gt; System.out.println(&quot;key=&quot; + key + &quot;, value=&quot; + value));
// Java 7 example
KStream&lt;byte[], String&gt; unmodifiedStream = stream.peek(
new ForeachAction&lt;byte[], String&gt;() {
@Override
public void apply(byte[] key, String value) {
System.out.println(&quot;key=&quot; + key + &quot;, value=&quot; + value);
}
});</code></pre>
</code></pre>
</td>
</tr>
<tr class="row-odd"><td><p class="first"><strong>Print</strong></p>
@ -788,15 +711,7 @@ stream.print(Printed.toFile(&quot;streams.out&quot;).withLabel(&quot;streams&quo
// Derive a new record key from the record&#39;s value. Note how the key type changes, too.
// Java 8+ example, using lambda expressions
KStream&lt;String, String&gt; rekeyed = stream.selectKey((key, value) -&gt; value.split(&quot; &quot;)[0])
// Java 7 example
KStream&lt;String, String&gt; rekeyed = stream.selectKey(
new KeyValueMapper&lt;byte[], String, String&gt;() {
@Override
public String apply(byte[] key, String value) {
return value.split(&quot; &quot;)[0];
}
});</code></pre>
</code></pre>
</td>
</tr>
<tr class="row-odd"><td><p class="first"><strong>Table to Stream</strong></p>
@ -895,25 +810,7 @@ KStream&lt;String, Long&gt; wordCounts = textLines
.count()
// Convert the `KTable&lt;String, Long&gt;` into a `KStream&lt;String, Long&gt;`.
.toStream();</code></pre>
<p>WordCount example in Java 7:</p>
<pre class="line-numbers"><code class="language-java">// Code below is equivalent to the previous Java 8+ example above.
KStream&lt;String, String&gt; textLines = ...;
KStream&lt;String, Long&gt; wordCounts = textLines
.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
@Override
public Iterable&lt;String&gt; apply(String value) {
return Arrays.asList(value.toLowerCase().split(&quot;\\W+&quot;));
}
})
.groupBy(new KeyValueMapper&lt;String, String, String&gt;&gt;() {
@Override
public String apply(String key, String word) {
return word;
}
})
.count()
.toStream();</code></pre>
<div class="section" id="aggregating">
<span id="streams-developer-guide-dsl-aggregating"></span><h4><a class="toc-backref" href="#id12">Aggregating</a><a class="headerlink" href="#aggregating" title="Permalink to this headline"></a></h4>
<p>After records are <a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateless"><span class="std std-ref">grouped</span></a> by key via <code class="docutils literal"><span class="pre">groupByKey</span></code> or
@ -969,49 +866,7 @@ KTable&lt;byte[], Long&gt; aggregatedTable = groupedTable.aggregate(
(aggKey, oldValue, aggValue) -&gt; aggValue - oldValue.length(), /* subtractor */
Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as(&quot;aggregated-table-store&quot;) /* state store name */
.withValueSerde(Serdes.Long()) /* serde for aggregate value */
// Java 7 examples
// Aggregating a KGroupedStream (note how the value type changes from String to Long)
KTable&lt;byte[], Long&gt; aggregatedStream = groupedStream.aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
return 0L;
}
},
new Aggregator&lt;byte[], String, Long&gt;() { /* adder */
@Override
public Long apply(byte[] aggKey, String newValue, Long aggValue) {
return aggValue + newValue.length();
}
},
Materialized.as(&quot;aggregated-stream-store&quot;)
.withValueSerde(Serdes.Long());
// Aggregating a KGroupedTable (note how the value type changes from String to Long)
KTable&lt;byte[], Long&gt; aggregatedTable = groupedTable.aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
return 0L;
}
},
new Aggregator&lt;byte[], String, Long&gt;() { /* adder */
@Override
public Long apply(byte[] aggKey, String newValue, Long aggValue) {
return aggValue + newValue.length();
}
},
new Aggregator&lt;byte[], String, Long&gt;() { /* subtractor */
@Override
public Long apply(byte[] aggKey, String oldValue, Long aggValue) {
return aggValue - oldValue.length();
}
},
Materialized.as(&quot;aggregated-stream-store&quot;)
.withValueSerde(Serdes.Long());</code></pre>
</code></pre>
<p>Detailed behavior of <code class="docutils literal"><span class="pre">KGroupedStream</span></code>:</p>
<ul class="simple">
<li>Input records with <code class="docutils literal"><span class="pre">null</span></code> keys are ignored.</li>
@ -1084,50 +939,7 @@ KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream = grouped
(aggKey, leftAggValue, rightAggValue) -&gt; leftAggValue + rightAggValue, /* session merger */
Materialized.&lt;String, Long, SessionStore&lt;Bytes, byte[]&gt;&gt;as(&quot;sessionized-aggregated-stream-store&quot;) /* state store name */
.withValueSerde(Serdes.Long())); /* serde for aggregate value */
// Java 7 examples
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.windowedBy(Duration.ofMinutes(5))
.aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
return 0L;
}
},
new Aggregator&lt;String, Long, Long&gt;() { /* adder */
@Override
public Long apply(String aggKey, Long newValue, Long aggValue) {
return aggValue + newValue;
}
},
Materialized.&lt;String, Long, WindowStore&lt;Bytes, byte[]&gt;&gt;as(&quot;time-windowed-aggregated-stream-store&quot;)
.withValueSerde(Serdes.Long()));
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionizedAggregatedStream = groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5)).
aggregate(
new Initializer&lt;Long&gt;() { /* initializer */
@Override
public Long apply() {
return 0L;
}
},
new Aggregator&lt;String, Long, Long&gt;() { /* adder */
@Override
public Long apply(String aggKey, Long newValue, Long aggValue) {
return aggValue + newValue;
}
},
new Merger&lt;String, Long&gt;() { /* session merger */
@Override
public Long apply(String aggKey, Long leftAggValue, Long rightAggValue) {
return rightAggValue + leftAggValue;
}
},
Materialized.&lt;String, Long, SessionStore&lt;Bytes, byte[]&gt;&gt;as(&quot;sessionized-aggregated-stream-store&quot;)
.withValueSerde(Serdes.Long()));</code></pre>
</code></pre>
<p>Detailed behavior:</p>
<ul class="simple">
<li>The windowed aggregate behaves similar to the rolling aggregate described above. The additional twist is that
@ -1233,33 +1045,7 @@ KTable&lt;String, Long&gt; aggregatedStream = groupedStream.reduce(
KTable&lt;String, Long&gt; aggregatedTable = groupedTable.reduce(
(aggValue, newValue) -&gt; aggValue + newValue, /* adder */
(aggValue, oldValue) -&gt; aggValue - oldValue /* subtractor */);
// Java 7 examples
// Reducing a KGroupedStream
KTable&lt;String, Long&gt; aggregatedStream = groupedStream.reduce(
new Reducer&lt;Long&gt;() { /* adder */
@Override
public Long apply(Long aggValue, Long newValue) {
return aggValue + newValue;
}
});
// Reducing a KGroupedTable
KTable&lt;String, Long&gt; aggregatedTable = groupedTable.reduce(
new Reducer&lt;Long&gt;() { /* adder */
@Override
public Long apply(Long aggValue, Long newValue) {
return aggValue + newValue;
}
},
new Reducer&lt;Long&gt;() { /* subtractor */
@Override
public Long apply(Long aggValue, Long oldValue) {
return aggValue - oldValue;
}
});</code></pre>
</code></pre>
<p>Detailed behavior for <code class="docutils literal"><span class="pre">KGroupedStream</span></code>:</p>
<ul class="simple">
<li>Input records with <code class="docutils literal"><span class="pre">null</span></code> keys are ignored in general.</li>
@ -1328,31 +1114,7 @@ KTable&lt;Windowed&lt;String&gt;, Long&gt; sessionzedAggregatedStream = groupedS
.reduce(
(aggValue, newValue) -&gt; aggValue + newValue /* adder */
);
// Java 7 examples
// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.windowedBy(
TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)) /* time-based window */)
.reduce(
new Reducer&lt;Long&gt;() { /* adder */
@Override
public Long apply(Long aggValue, Long newValue) {
return aggValue + newValue;
}
});
// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable&lt;Windowed&lt;String&gt;, Long&gt; timeWindowedAggregatedStream = groupedStream.windowedBy(
SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5))) /* session window */
.reduce(
new Reducer&lt;Long&gt;() { /* adder */
@Override
public Long apply(Long aggValue, Long newValue) {
return aggValue + newValue;
}
});</code></pre>
</code></pre>
<p>Detailed behavior:</p>
<ul class="simple">
<li>The windowed reduce behaves similar to the rolling reduce described above. The additional twist is that the
@ -1812,21 +1574,7 @@ KStream&lt;String, String&gt; joined = left.join(right,
Serdes.Long(), /* left value */
Serdes.Double()) /* right value */
);
// Java 7 example
KStream&lt;String, String&gt; joined = left.join(right,
new ValueJoiner&lt;Long, Double, String&gt;() {
@Override
public String apply(Long leftValue, Double rightValue) {
return &quot;left=&quot; + leftValue + &quot;, right=&quot; + rightValue;
}
},
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
Joined.with(
Serdes.String(), /* key */
Serdes.Long(), /* left value */
Serdes.Double()) /* right value */
);</code></pre>
</code></pre>
<p>Detailed behavior:</p>
<ul>
<li><p class="first">The join is <em>key-based</em>, i.e. with the join predicate <code class="docutils literal"><span class="pre">leftRecord.key</span> <span class="pre">==</span> <span class="pre">rightRecord.key</span></code>, and <em>window-based</em>, i.e. two input records are joined if and only if their
@ -1869,21 +1617,7 @@ KStream&lt;String, String&gt; joined = left.leftJoin(right,
Serdes.Long(), /* left value */
Serdes.Double()) /* right value */
);
// Java 7 example
KStream&lt;String, String&gt; joined = left.leftJoin(right,
new ValueJoiner&lt;Long, Double, String&gt;() {
@Override
public String apply(Long leftValue, Double rightValue) {
return &quot;left=&quot; + leftValue + &quot;, right=&quot; + rightValue;
}
},
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
Joined.with(
Serdes.String(), /* key */
Serdes.Long(), /* left value */
Serdes.Double()) /* right value */
);</code></pre>
</code></pre>
<p>Detailed behavior:</p>
<ul>
<li><p class="first">The join is <em>key-based</em>, i.e. with the join predicate <code class="docutils literal"><span class="pre">leftRecord.key</span> <span class="pre">==</span> <span class="pre">rightRecord.key</span></code>, and <em>window-based</em>, i.e. two input records are joined if and only if their
@ -1930,21 +1664,7 @@ KStream&lt;String, String&gt; joined = left.outerJoin(right,
Serdes.Long(), /* left value */
Serdes.Double()) /* right value */
);
// Java 7 example
KStream&lt;String, String&gt; joined = left.outerJoin(right,
new ValueJoiner&lt;Long, Double, String&gt;() {
@Override
public String apply(Long leftValue, Double rightValue) {
return &quot;left=&quot; + leftValue + &quot;, right=&quot; + rightValue;
}
},
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
Joined.with(
Serdes.String(), /* key */
Serdes.Long(), /* left value */
Serdes.Double()) /* right value */
);</code></pre>
</code></pre>
<p>Detailed behavior:</p>
<ul>
<li><p class="first">The join is <em>key-based</em>, i.e. with the join predicate <code class="docutils literal"><span class="pre">leftRecord.key</span> <span class="pre">==</span> <span class="pre">rightRecord.key</span></code>, and <em>window-based</em>, i.e. two input records are joined if and only if their
@ -2197,15 +1917,7 @@ KTable&lt;String, Double&gt; right = ...;
KTable&lt;String, String&gt; joined = left.join(right,
(leftValue, rightValue) -&gt; &quot;left=&quot; + leftValue + &quot;, right=&quot; + rightValue /* ValueJoiner */
);
// Java 7 example
KTable&lt;String, String&gt; joined = left.join(right,
new ValueJoiner&lt;Long, Double, String&gt;() {
@Override
public String apply(Long leftValue, Double rightValue) {
return &quot;left=&quot; + leftValue + &quot;, right=&quot; + rightValue;
}
});</code></pre>
</code></pre>
<p>Detailed behavior:</p>
<ul>
<li><p class="first">The join is <em>key-based</em>, i.e. with the join predicate <code class="docutils literal"><span class="pre">leftRecord.key</span> <span class="pre">==</span> <span class="pre">rightRecord.key</span></code>.</p>
@ -2244,15 +1956,7 @@ KTable&lt;String, Double&gt; right = ...;
KTable&lt;String, String&gt; joined = left.leftJoin(right,
(leftValue, rightValue) -&gt; &quot;left=&quot; + leftValue + &quot;, right=&quot; + rightValue /* ValueJoiner */
);
// Java 7 example
KTable&lt;String, String&gt; joined = left.leftJoin(right,
new ValueJoiner&lt;Long, Double, String&gt;() {
@Override
public String apply(Long leftValue, Double rightValue) {
return &quot;left=&quot; + leftValue + &quot;, right=&quot; + rightValue;
}
});</code></pre>
</code></pre>
<p>Detailed behavior:</p>
<ul>
<li><p class="first">The join is <em>key-based</em>, i.e. with the join predicate <code class="docutils literal"><span class="pre">leftRecord.key</span> <span class="pre">==</span> <span class="pre">rightRecord.key</span></code>.</p>
@ -2294,15 +1998,7 @@ KTable&lt;String, Double&gt; right = ...;
KTable&lt;String, String&gt; joined = left.outerJoin(right,
(leftValue, rightValue) -&gt; &quot;left=&quot; + leftValue + &quot;, right=&quot; + rightValue /* ValueJoiner */
);
// Java 7 example
KTable&lt;String, String&gt; joined = left.outerJoin(right,
new ValueJoiner&lt;Long, Double, String&gt;() {
@Override
public String apply(Long leftValue, Double rightValue) {
return &quot;left=&quot; + leftValue + &quot;, right=&quot; + rightValue;
}
});</code></pre>
</code></pre>
<p>Detailed behavior:</p>
<ul>
<li><p class="first">The join is <em>key-based</em>, i.e. with the join predicate <code class="docutils literal"><span class="pre">leftRecord.key</span> <span class="pre">==</span> <span class="pre">rightRecord.key</span></code>.</p>
@ -2816,19 +2512,7 @@ KStream&lt;String, String&gt; joined = left.join(right,
.withValueSerde(Serdes.Long()) /* left value */
.withGracePeriod(Duration.ZERO) /* grace period */
);
// Java 7 example
KStream&lt;String, String&gt; joined = left.join(right,
new ValueJoiner&lt;Long, Double, String&gt;() {
@Override
public String apply(Long leftValue, Double rightValue) {
return &quot;left=&quot; + leftValue + &quot;, right=&quot; + rightValue;
}
},
Joined.keySerde(Serdes.String()) /* key */
.withValueSerde(Serdes.Long()) /* left value */
.withGracePeriod(Duration.ZERO) /* grace period */
);</code></pre>
</code></pre>
<p>Detailed behavior:</p>
<ul>
<li><p class="first">The join is <em>key-based</em>, i.e. with the join predicate <code class="docutils literal"><span class="pre">leftRecord.key</span> <span class="pre">==</span> <span class="pre">rightRecord.key</span></code>.</p>
@ -2878,19 +2562,7 @@ KStream&lt;String, String&gt; joined = left.leftJoin(right,
.withValueSerde(Serdes.Long()) /* left value */
.withGracePeriod(Duration.ZERO) /* grace period */
);
// Java 7 example
KStream&lt;String, String&gt; joined = left.leftJoin(right,
new ValueJoiner&lt;Long, Double, String&gt;() {
@Override
public String apply(Long leftValue, Double rightValue) {
return &quot;left=&quot; + leftValue + &quot;, right=&quot; + rightValue;
}
},
Joined.keySerde(Serdes.String()) /* key */
.withValueSerde(Serdes.Long()) /* left value */
.withGracePeriod(Duration.ZERO) /* grace period */
);</code></pre>
</code></pre>
<p>Detailed behavior:</p>
<ul>
<li><p class="first">The join is <em>key-based</em>, i.e. with the join predicate <code class="docutils literal"><span class="pre">leftRecord.key</span> <span class="pre">==</span> <span class="pre">rightRecord.key</span></code>.</p>
@ -3101,21 +2773,7 @@ KStream&lt;String, String&gt; joined = left.join(right,
(leftKey, leftValue) -&gt; leftKey.length(), /* derive a (potentially) new key by which to lookup against the table */
(leftValue, rightValue) -&gt; &quot;left=&quot; + leftValue + &quot;, right=&quot; + rightValue /* ValueJoiner */
);
// Java 7 example
KStream&lt;String, String&gt; joined = left.join(right,
new KeyValueMapper&lt;String, Long, Integer&gt;() { /* derive a (potentially) new key by which to lookup against the table */
@Override
public Integer apply(String key, Long value) {
return key.length();
}
},
new ValueJoiner&lt;Long, Double, String&gt;() {
@Override
public String apply(Long leftValue, Double rightValue) {
return &quot;left=&quot; + leftValue + &quot;, right=&quot; + rightValue;
}
});</code></pre>
</code></pre>
<p>Detailed behavior:</p>
<ul class="last">
<li><p class="first">The join is indirectly <em>key-based</em>, i.e. with the join predicate <code class="docutils literal"><span class="pre">KeyValueMapper#apply(leftRecord.key,</span> <span class="pre">leftRecord.value)</span> <span class="pre">==</span> <span class="pre">rightRecord.key</span></code>.</p>
@ -3153,21 +2811,7 @@ KStream&lt;String, String&gt; joined = left.leftJoin(right,
(leftKey, leftValue) -&gt; leftKey.length(), /* derive a (potentially) new key by which to lookup against the table */
(leftValue, rightValue) -&gt; &quot;left=&quot; + leftValue + &quot;, right=&quot; + rightValue /* ValueJoiner */
);
// Java 7 example
KStream&lt;String, String&gt; joined = left.leftJoin(right,
new KeyValueMapper&lt;String, Long, Integer&gt;() { /* derive a (potentially) new key by which to lookup against the table */
@Override
public Integer apply(String key, Long value) {
return key.length();
}
},
new ValueJoiner&lt;Long, Double, String&gt;() {
@Override
public String apply(Long leftValue, Double rightValue) {
return &quot;left=&quot; + leftValue + &quot;, right=&quot; + rightValue;
}
});</code></pre>
</code></pre>
<p>Detailed behavior:</p>
<ul class="last">
<li><p class="first">The join is indirectly <em>key-based</em>, i.e. with the join predicate <code class="docutils literal"><span class="pre">KeyValueMapper#apply(leftRecord.key,</span> <span class="pre">leftRecord.value)</span> <span class="pre">==</span> <span class="pre">rightRecord.key</span></code>.</p>

View File

@ -152,31 +152,19 @@ streams.start();</code></pre>
<p>To catch any unexpected exceptions, you can set an <code class="docutils literal"><span class="pre">java.lang.Thread.UncaughtExceptionHandler</span></code> before you start the
application. This handler is called whenever a stream thread is terminated by an unexpected exception:</p>
<pre class="line-numbers"><code class="language-java">// Java 8+, using lambda expressions
streams.setUncaughtExceptionHander((exception) -&gt; StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);</code></pre>
<p>The <code class="docutils literal"><span class="pre">StreamsUncaughtExceptionHandler</span></code> interface enables responding to exceptions not handled by Kafka Streams. It has one method, <code class="docutils literal"><span class="pre">handle</span></code>, that returns an enum of type <code class="docutils literal"><span class="pre">StreamThreadExceptionResponse</span></code>. You have the opportunity to define how Streams responds to the exception, with three possible values: <code class="docutils literal"><span class="pre">REPLACE_THREAD</span></code>, <code class="docutils literal"><span class="pre">SHUTDOWN_CLIENT</span></code>, or <code class="docutils literal"><span class="pre">SHUTDOWN_APPLICATION</span></code>.
<p>The <code class="docutils literal"><span class="pre">SHUTDOWN_APPLICATION</span></code> option is best-effort only and doesn't guarantee that all application instances will be stopped.
streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -&gt; {
// here you should examine the throwable/exception and perform an appropriate action!
});
</code></pre>
<p>To stop the application instance, call the <code class="docutils literal"><span class="pre">KafkaStreams#close()</span></code> method:</p>
<pre class="line-numbers"><code class="language-java">// Stop the Kafka Streams threads
streams.close();</code></pre>
<p>To allow your application to gracefully shutdown in response to SIGTERM, it is recommended that you add a shutdown hook
and call <code class="docutils literal"><span class="pre">KafkaStreams#close</span></code>.</p>
<ul>
<li><p class="first">Here is a shutdown hook example in Java 8+:</p>
<p class="first">Here is a shutdown hook example in Java 8+:</p>
<pre class="line-numbers"><code class="language-java">// Add shutdown hook to stop the Kafka Streams threads.
// You can optionally provide a timeout to `close`.
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));</code></pre>
</li>
<li><p class="first">Here is a shutdown hook example in Java 7:</p>
<pre class="line-numbers"><code class="language-java">// Add shutdown hook to stop the Kafka Streams threads.
// You can optionally provide a timeout to `close`.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
streams.close();
}
}));</code></pre>
</li>
</ul>
<p>After an application is stopped, Kafka Streams will migrate any tasks that had been running in this instance to available remaining
instances.</p>
</div>

View File

@ -193,7 +193,6 @@
<div class="code-example">
<div class="btn-group">
<a class="selected b-java-8" data-section="java-8">Java 8+</a>
<a class="b-java-7" data-section="java-7">Java 7</a>
<a class="b-scala" data-section="scala">Scala</a>
</div>
@ -233,59 +232,6 @@ public class WordCountApplication {
streams.start();
}
}</code></pre>
</div>
<div class="code-example__snippet b-java-7">
<pre class="line-numbers"><code class="language-java">import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays;
import java.util.Properties;
public class WordCountApplication {
public static void main(final String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream&lt;String, String&gt; textLines = builder.stream("TextLinesTopic");
KTable&lt;String, Long&gt; wordCounts = textLines
.flatMapValues(new ValueMapper&lt;String, Iterable&lt;String&gt;&gt;() {
@Override
public Iterable&lt;String&gt; apply(String textLine) {
return Arrays.asList(textLine.toLowerCase().split("\\W+"));
}
})
.groupBy(new KeyValueMapper&lt;String, String, String&gt;() {
@Override
public String apply(String key, String word) {
return word;
}
})
.count(Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("counts-store"));
wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}</code></pre>
</div>