diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html index 86f6c7855a3..d9dd2207395 100644 --- a/docs/streams/developer-guide/dsl-api.html +++ b/docs/streams/developer-guide/dsl-api.html @@ -551,7 +551,7 @@ // When the key and/or value types do not match the configured // default serdes, we must explicitly specify serdes. KGroupedStream<byte[], String> groupedStream = stream.groupByKey( - Serialized.with( + Grouped.with( Serdes.ByteArray(), /* key */ Serdes.String()) /* value */ ); @@ -594,7 +594,7 @@ // Group the stream by a new key and key type KGroupedStream<String, String> groupedStream = stream.groupBy( (key, value) -> value, - Serialized.with( + Grouped.with( Serdes.String(), /* key (note: type was modified) */ Serdes.String()) /* value */ ); @@ -602,7 +602,7 @@ // Group the table by a new key and key type, and also modify the value and value type. KGroupedTable<String, Integer> groupedTable = table.groupBy( (key, value) -> KeyValue.pair(value, value.length()), - Serialized.with( + Grouped.with( Serdes.String(), /* key (note: type was modified) */ Serdes.Integer()) /* value (note: type was modified) */ ); @@ -618,7 +618,7 @@ return value; } }, - Serialized.with( + Grouped.with( Serdes.String(), /* key (note: type was modified) */ Serdes.String()) /* value */ ); @@ -631,7 +631,7 @@ return KeyValue.pair(value, value.length()); } }, - Serialized.with( + Grouped.with( Serdes.String(), /* key (note: type was modified) */ Serdes.Integer()) /* value (note: type was modified) */ ); @@ -1330,7 +1330,7 @@ KStream<String, Integer> wordCounts = ...; KGroupedStream<String, Integer> groupedStream = wordCounts - .groupByKey(Serialized.with(Serdes.String(), Serdes.Integer())); + .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer())); KTable<String, Integer> aggregated = groupedStream.aggregate( () -> 0, /* initializer */ @@ -3355,18 +3355,18 @@ object WordCountApplication extends App { } } -
In the above code snippet, we don't have to provide any SerDes, Serialized
, Produced
, Consumed
or Joined
explicitly. They will also not be dependent on any SerDes specified in the config. In fact all SerDes specified in the config will be ignored by the Scala APIs. All SerDes and Serialized
, Produced
, Consumed
or Joined
will be handled through implicit SerDes as discussed later in the Implicit SerDes section. The complete independence from configuration based SerDes is what makes this library completely typesafe. Any missing instances of SerDes, Serialized
, Produced
, Consumed
or Joined
will be flagged as a compile time error.
In the above code snippet, we don't have to provide any SerDes, Grouped
, Produced
, Consumed
or Joined
explicitly. They will also not be dependent on any SerDes specified in the config. In fact all SerDes specified in the config will be ignored by the Scala APIs. All SerDes and Grouped
, Produced
, Consumed
or Joined
will be handled through implicit SerDes as discussed later in the Implicit SerDes section. The complete independence from configuration based SerDes is what makes this library completely typesafe. Any missing instances of SerDes, Grouped
, Produced
, Consumed
or Joined
will be flagged as a compile time error.
One of the common complaints of Scala users with the Java API has been the repetitive usage of the SerDes in API invocations. Many of the APIs need to take the SerDes through abstractions like Serialized
, Produced
, Consumed
or Joined
. And the user has to supply them every time through the with function of these classes.
The library uses the power of Scala implicit parameters to alleviate this concern. As a user you can provide implicit SerDes or implicit values of Serialized
, Produced
, Consumed
or Joined
once and make your code less verbose. In fact you can just have the implicit SerDes in scope and the library will make the instances of Serialized
, Produced
, Consumed
or Joined
available in scope.
One of the common complaints of Scala users with the Java API has been the repetitive usage of the SerDes in API invocations. Many of the APIs need to take the SerDes through abstractions like Grouped
, Produced
, Consumed
or Joined
. And the user has to supply them every time through the with function of these classes.
The library uses the power of Scala implicit parameters to alleviate this concern. As a user you can provide implicit SerDes or implicit values of Grouped
, Produced
, Consumed
or Joined
once and make your code less verbose. In fact you can just have the implicit SerDes in scope and the library will make the instances of Grouped
, Produced
, Consumed
or Joined
available in scope.
The library also bundles all implicit SerDes of the commonly used primitive types in a Scala module - so just import the module vals and have all SerDes in scope. A similar strategy of modular implicits can be adopted for any user-defined SerDes as well (User-defined SerDes are discussed in the next section).
Here's an example:
// DefaultSerdes brings into scope implicit SerDes (mostly for primitives) -// that will set up all Serialized, Produced, Consumed and Joined instances. -// So all APIs below that accept Serialized, Produced, Consumed or Joined will +// that will set up all Grouped, Produced, Consumed and Joined instances. +// So all APIs below that accept Grouped, Produced, Consumed or Joined will // get these instances automatically import Serdes._ @@ -3376,7 +3376,7 @@ val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic) val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic) -// The following code fragment does not have a single instance of Serialized, +// The following code fragment does not have a single instance of Grouped, // Produced, Consumed or Joined supplied explicitly. // All of them are taken care of by the implicit SerDes imported by DefaultSerdes val clicksPerRegion: KTable[String, Long] = diff --git a/docs/streams/developer-guide/interactive-queries.html b/docs/streams/developer-guide/interactive-queries.html index ca0936ec64c..d1b9d9b87d1 100644 --- a/docs/streams/developer-guide/interactive-queries.html +++ b/docs/streams/developer-guide/interactive-queries.html @@ -136,7 +136,7 @@ // Define the processing topology (here: WordCount) KGroupedStream<String, String> groupedByWord = textLines .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) - .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde)); + .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde)); // Create a key-value store named "CountsKeyValueStore" for the all-time word counts groupedByWord.count(Materialized.<String, String, KeyValueStore<Bytes, byte[]>as("CountsKeyValueStore")); @@ -199,7 +199,7 @@ // Define the processing topology (here: WordCount) KGroupedStream<String, String> groupedByWord = textLines .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) - .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde)); + .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde)); // Create a window state store named "CountsWindowStore" that contains the word counts for every minute groupedByWord.windowedBy(TimeWindows.of(Duration.ofSeconds(60))) @@ -392,7 +392,7 @@ interactive queries final KGroupedStream<String, String> groupedByWord = textLines .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) - .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde)); + .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde)); // This call to `count()` creates a state store named "word-count". // The state store is discoverable and can be queried interactively. diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 3bc82a14dc7..b26f3c339cf 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -77,6 +77,19 @@ KIP-321. ++ We've added a new class
+Grouped
and deprecatedSerialized
. The intent of addingGrouped
is the ability to + name repartition topics created when performing aggregation operations. Users can name the potential repartition topic using the +Grouped#as()
method which takes aString
and is used as part of the repartition topic name. The resulting repartition + topic name will still follow the pattern of${application-id}->name<-repartition
. TheGrouped
class is now favored over +Serialized
inKStream#groupByKey()
,KStream#groupBy()
, andKTable#groupBy()
. + Note that Kafka Streams does not automatically create repartition topics for aggregation operations. + + Additionally, we've updated theJoined
class with a new methodJoined#withName
+ enabling users to name any repartition topics required for performing Stream/Stream or Stream/Table join. For more details repartition + topic naming, see KIP-372. +We've added a new config named
max.task.idle.ms
to allow users specify how to handle out-of-order data within a task that may be processing multiple topic-partitions (see Out-of-Order Handling section for more details).