KAFKA-5817; Add Serialized class and overloads to KStream#groupBy and KStream#groupByKey

Part of KIP-182
- Add the `Serialized` class
- implement overloads of `KStream#groupByKey` and KStream#groupBy`
- deprecate existing methods that have more than default arguments

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3772 from dguy/kafka-5817
This commit is contained in:
Damian Guy 2017-09-06 10:43:14 +01:00
parent 2fb5664bf4
commit b687c06800
13 changed files with 218 additions and 47 deletions

View File

@ -842,8 +842,9 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
// When the key and/or value types do not match the configured
// default serdes, we must explicitly specify serdes.
KGroupedStream&lt;byte[], String&gt; groupedStream = stream.groupByKey(
Serdes.ByteArray(), /* key */
Serdes.String() /* value */
Serialized.with(
Serdes.ByteArray(), /* key */
Serdes.String()) /* value */
);
</pre>
</td>
@ -883,15 +884,17 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
// Group the stream by a new key and key type
KGroupedStream&lt;String, String&gt; groupedStream = stream.groupBy(
(key, value) -> value,
Serdes.String(), /* key (note: type was modified) */
Serdes.String() /* value */
Serialize.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.String()) /* value */
);
// Group the table by a new key and key type, and also modify the value and value type.
KGroupedTable&lt;String, Integer&gt; groupedTable = table.groupBy(
(key, value) -> KeyValue.pair(value, value.length()),
Serdes.String(), /* key (note: type was modified) */
Serdes.Integer() /* value (note: type was modified) */
Serialized.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.Integer()) /* value (note: type was modified) */
);
@ -905,8 +908,9 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
return value;
}
},
Serdes.String(), /* key (note: type was modified) */
Serdes.String() /* value */
Serialized.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.String()) /* value */
);
// Group the table by a new key and key type, and also modify the value and value type.
@ -917,8 +921,9 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
return KeyValue.pair(value, value.length());
}
},
Serdes.String(), /* key (note: type was modified) */
Serdes.Integer() /* value (note: type was modified) */
Serialized.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.Integer()) /* value (note: type was modified) */
);
</pre>
</td>
@ -1659,7 +1664,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
KStream&lt;String, Integer&gt; wordCounts = ...;
KGroupedStream&lt;String, Integer&gt; groupedStream = wordCounts
.groupByKey(Serdes.String(), Serdes.Integer());
.groupByKey(Serialized.with(Serdes.String(), Serdes.Integer()));
KTable&lt;String, Integer&gt; aggregated = groupedStream.aggregate(
() -> 0, /* initializer */
@ -1763,7 +1768,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
// its prime purpose in this example is to show the *effects* of the grouping
// in the subsequent aggregation.
KGroupedTable&lt;String, Integer&gt; groupedTable = userProfiles
.groupBy((user, region) -> KeyValue.pair(region, user.length()), Serdes.String(), Serdes.Integer());
.groupBy((user, region) -> KeyValue.pair(region, user.length()), Serialized.with(Serdes.String(), Serdes.Integer()));
KTable&lt;String, Integer&gt; aggregated = groupedTable.aggregate(
() -> 0, /* initializer */
@ -2117,7 +2122,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
// Define the processing topology (here: WordCount)
KGroupedStream&lt;String, String&gt; groupedByWord = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, stringSerde, stringSerde);
.groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
// Create a key-value store named "CountsKeyValueStore" for the all-time word counts
groupedByWord.count("CountsKeyValueStore");
@ -2173,7 +2178,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
// Define the processing topology (here: WordCount)
KGroupedStream&lt;String, String&gt; groupedByWord = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, stringSerde, stringSerde);
.groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
// Create a window state store named "CountsWindowStore" that contains the word counts for every minute
groupedByWord.count(TimeWindows.of(60000), "CountsWindowStore");
@ -2396,7 +2401,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
KGroupedStream&lt;String, String&gt; groupedByWord = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, stringSerde, stringSerde);
.groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));
// This call to `count()` creates a state store named "word-count".
// The state store is discoverable and can be queried interactively.

View File

@ -28,6 +28,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
@ -168,7 +169,7 @@ public class PageViewTypedDemo {
return new KeyValue<>(viewRegion.region, viewRegion);
}
})
.groupByKey(Serdes.String(), pageViewByRegionSerde)
.groupByKey(Serialized.with(Serdes.String(), pageViewByRegionSerde))
.count(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000), "RollingSevenDaysOfPageViewsByRegion")
// TODO: we can merge ths toStream().map(...) with a single toStream(...)
.toStream()

View File

@ -33,6 +33,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
@ -100,7 +101,7 @@ public class PageViewUntypedDemo {
return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
}
})
.groupByKey(Serdes.String(), jsonSerde)
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.count(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000), "RollingSevenDaysOfPageViewsByRegion")
// TODO: we can merge ths toStream().map(...) with a single toStream(...)
.toStream()

View File

@ -1012,13 +1012,38 @@ public interface KStream<K, V> {
* records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned
* correctly on its key.
* If the last key changing operator changed the key type, it is recommended to use
* {@link #groupByKey(Serde, Serde)} instead.
* {@link #groupByKey(Serialized)} instead.
*
* @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
* @see #groupBy(KeyValueMapper)
*/
KGroupedStream<K, V> groupByKey();
/**
* Group the records by their current key into a {@link KGroupedStream} while preserving the original values
* and using the serializers as defined by {@link Serialized}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
* If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
* <p>
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
* {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
* {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
* This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
* an internally generated name, and "-repartition" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
* <p>
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned
* correctly on its key.
*
* @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
* @see #groupBy(KeyValueMapper)
*/
KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized);
/**
* Group the records by their current key into a {@link KGroupedStream} while preserving the original values.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
@ -1043,7 +1068,9 @@ public interface KStream<K, V> {
* @param valSerde value serdes for materializing this stream,
* if not specified the default serdes defined in the configs will be used
* @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
* @deprecated use {@code groupByKey(Serialized)}
*/
@Deprecated
KGroupedStream<K, V> groupByKey(final Serde<K> keySerde,
final Serde<V> valSerde);
@ -1065,7 +1092,7 @@ public interface KStream<K, V> {
* and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key.
* <p>
* This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
* If the key type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Serde, Serde)} instead.
* If the key type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Serialized)} instead.
*
* @param selector a {@link KeyValueMapper} that computes a new key for grouping
* @param <KR> the key type of the result {@link KGroupedStream}
@ -1073,6 +1100,32 @@ public interface KStream<K, V> {
*/
<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector);
/**
* Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
* and {@link Serde}s as specified by {@link Serialized}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
* The {@link KeyValueMapper} selects a new key (with should be of the same type) while preserving the original values.
* If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
* <p>
* Because a new key is selected, an internal repartitioning topic will be created in Kafka.
* This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
* an internally generated name, and "-repartition" is a fixed suffix.
* You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
* <p>
* All data of this stream will be redistributed through the repartitioning topic by writing all records to it,
* and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key.
* <p>
* This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
*
* @param selector a {@link KeyValueMapper} that computes a new key for grouping
* @param <KR> the key type of the result {@link KGroupedStream}
* @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
*/
<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
final Serialized<KR, V> serialized);
/**
* Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
@ -1100,7 +1153,9 @@ public interface KStream<K, V> {
* @param <KR> the key type of the result {@link KGroupedStream}
* @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
* @see #groupByKey()
* @deprecated use {@code groupBy(KeyValueMapper, Serialized}
*/
@Deprecated
<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
final Serde<KR> keySerde,
final Serde<V> valSerde);

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Serde;
/**
* The class that is used to capture the key and value {@link Serde}s used when performing
* {@link KStream#groupBy(KeyValueMapper, Serialized)} and {@link KStream#groupByKey(Serialized)} operations.
*
* @param <K> the key type
* @param <V> the value type
*/
public class Serialized<K, V> {
private Serde<K> keySerde;
private Serde<V> valueSerde;
private Serialized(final Serde<K> keySerde,
final Serde<V> valueSerde) {
this.keySerde = keySerde;
this.valueSerde = valueSerde;
}
public Serde<K> keySerde() {
return keySerde;
}
public Serde<V> valueSerde() {
return valueSerde;
}
/**
* Construct a {@code Serialized} instance with the provided key and value {@link Serde}s.
* If the {@link Serde} params are {@code null} the default serdes defined in the configs will be used.
*
* @param keySerde keySerde that will be used to materialize a stream
* if not specified the default serdes defined in the configs will be used
* @param valueSerde valueSerde that will be used to materialize a stream
* if not specified the default serdes defined in the configs will be used
* @param <K> the key type
* @param <V> the value type
* @return a new instance of {@link Serialized} configured with the provided serdes
*/
public static <K, V> Serialized<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde) {
return new Serialized<>(keySerde, valueSerde);
}
/**
* Construct a {@code Serialized} instance with the provided key {@link Serde}.
* If the {@link Serde} params are null the default serdes defined in the configs will be used.
*
* @param keySerde keySerde that will be used to materialize a stream
* if not specified the default serdes defined in the configs will be used
* @return a new instance of {@link Serialized} configured with the provided key serde
*/
public Serialized<K, V> withKeySerde(final Serde<K> keySerde) {
return new Serialized<>(keySerde, null);
}
/**
* Construct a {@code Serialized} instance with the provided value {@link Serde}.
* If the {@link Serde} params are null the default serdes defined in the configs will be used.
*
* @param valueSerde valueSerde that will be used to materialize a stream
* if not specified the default serdes defined in the configs will be used
* @return a new instance of {@link Serialized} configured with the provided key serde
*/
public Serialized<K, V> withValueSerde(final Serde<V> valueSerde) {
return new Serialized<>(null, valueSerde);
}
}

View File

@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.PrintForeachAction;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
@ -714,7 +715,21 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super K, ? super V, K1> selector) {
return groupBy(selector, null, null);
return groupBy(selector, Serialized.<K1, V>with(null, null));
}
@Override
public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
final Serialized<KR, V> serialized) {
Objects.requireNonNull(selector, "selector can't be null");
Objects.requireNonNull(serialized, "serialized can't be null");
String selectName = internalSelectKey(selector);
return new KGroupedStreamImpl<>(builder,
selectName,
sourceNodes,
serialized.keySerde(),
serialized.valueSerde(),
true);
}
@Override
@ -722,28 +737,29 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final Serde<K1> keySerde,
final Serde<V> valSerde) {
Objects.requireNonNull(selector, "selector can't be null");
String selectName = internalSelectKey(selector);
return new KGroupedStreamImpl<>(builder,
selectName,
sourceNodes,
keySerde,
valSerde, true);
return groupBy(selector, Serialized.with(keySerde, valSerde));
}
@Override
public KGroupedStream<K, V> groupByKey() {
return groupByKey(null, null);
return groupByKey(Serialized.<K, V>with(null, null));
}
@Override
public KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized) {
return new KGroupedStreamImpl<>(builder,
this.name,
sourceNodes,
serialized.keySerde(),
serialized.valueSerde(),
this.repartitionRequired);
}
@Override
public KGroupedStream<K, V> groupByKey(final Serde<K> keySerde,
final Serde<V> valSerde) {
return new KGroupedStreamImpl<>(builder,
this.name,
sourceNodes,
keySerde,
valSerde,
this.repartitionRequired);
return groupByKey(Serialized.with(keySerde, valSerde));
}
private static <K, V> StateStoreSupplier createWindowedStateStore(final JoinWindows windows,

View File

@ -34,6 +34,7 @@ import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.test.IntegrationTest;
@ -103,8 +104,7 @@ public class KStreamAggregationDedupIntegrationTest {
groupedStream = stream
.groupBy(
mapper,
Serdes.String(),
Serdes.String());
Serialized.with(Serdes.String(), Serdes.String()));
reducer = new Reducer<String>() {
@Override
@ -225,7 +225,7 @@ public class KStreamAggregationDedupIntegrationTest {
produceMessages(timestamp);
produceMessages(timestamp);
stream.groupByKey(Serdes.Integer(), Serdes.String())
stream.groupByKey(Serialized.with(Serdes.Integer(), Serdes.String()))
.count(TimeWindows.of(500L), "count-windows")
.toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() {
@Override

View File

@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
@ -411,7 +412,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(timestamp);
produceMessages(timestamp);
stream.groupByKey(Serdes.Integer(), Serdes.String())
stream.groupByKey(Serialized.with(Serdes.Integer(), Serdes.String()))
.count(TimeWindows.of(500L), "count-windows")
.toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() {
@Override
@ -513,7 +514,7 @@ public class KStreamAggregationIntegrationTest {
final Map<Windowed<String>, Long> results = new HashMap<>();
final CountDownLatch latch = new CountDownLatch(11);
builder.stream(Serdes.String(), Serdes.String(), userSessionsStream)
.groupByKey(Serdes.String(), Serdes.String())
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.count(SessionWindows.with(sessionGap).until(maintainMillis), "UserSessionsStore")
.toStream()
.foreach(new ForeachAction<Windowed<String>, Long>() {
@ -600,7 +601,7 @@ public class KStreamAggregationIntegrationTest {
final CountDownLatch latch = new CountDownLatch(11);
final String userSessionsStore = "UserSessionsStore";
builder.stream(Serdes.String(), Serdes.String(), userSessionsStream)
.groupByKey(Serdes.String(), Serdes.String())
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.reduce(new Reducer<String>() {
@Override
public String apply(final String value1, final String value2) {

View File

@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
@ -242,7 +243,7 @@ public class KStreamKTableJoinIntegrationTest {
}
})
// Compute the total per region by summing the individual click counts per region.
.groupByKey(stringSerde, longSerde)
.groupByKey(Serialized.with(stringSerde, longSerde))
.reduce(new Reducer<Long>() {
@Override
public Long apply(final Long value1, final Long value2) {

View File

@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
@ -68,7 +69,7 @@ public class KGroupedStreamImplTest {
@Before
public void before() {
final KStream<String, String> stream = builder.stream(Serdes.String(), Serdes.String(), TOPIC);
groupedStream = stream.groupByKey(Serdes.String(), Serdes.String());
groupedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()));
}
@Test(expected = NullPointerException.class)

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
@ -60,8 +61,7 @@ public class KStreamWindowAggregateTest {
KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
KTable<Windowed<String>, String> table2 =
stream1.groupByKey(strSerde,
strSerde)
stream1.groupByKey(Serialized.with(strSerde, strSerde))
.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
TimeWindows.of(10).advanceBy(5),
@ -153,7 +153,7 @@ public class KStreamWindowAggregateTest {
KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
KTable<Windowed<String>, String> table1 =
stream1.groupByKey(strSerde, strSerde)
stream1.groupByKey(Serialized.with(strSerde, strSerde))
.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
TimeWindows.of(10).advanceBy(5),
@ -164,7 +164,7 @@ public class KStreamWindowAggregateTest {
KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
KTable<Windowed<String>, String> table2 =
stream2.groupByKey(strSerde, strSerde)
stream2.groupByKey(Serialized.with(strSerde, strSerde))
.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
TimeWindows.of(10).advanceBy(5),

View File

@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
@ -353,7 +354,7 @@ public class YahooBenchmark {
// calculate windowed counts
keyedByCampaign
.groupByKey(Serdes.String(), Serdes.String())
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.count(TimeWindows.of(10 * 1000), "time-windows");
return new KafkaStreams(builder.build(), streamConfig);

View File

@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
@ -119,7 +120,7 @@ public class SmokeTestClient extends SmokeTestUtil {
// min
KGroupedStream<String, Integer>
groupedData =
data.groupByKey(stringSerde, intSerde);
data.groupByKey(Serialized.with(stringSerde, intSerde));
groupedData.aggregate(
new Initializer<Integer>() {