diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html index 3d39bdb24e6..458b47b149c 100644 --- a/docs/streams/developer-guide/datatypes.html +++ b/docs/streams/developer-guide/datatypes.html @@ -55,7 +55,7 @@

Configuring Serdes

Serdes specified in the Streams configuration are used as the default in your Kafka Streams application. - Because this config's default is null, you must either set a default SerDe by using this + Because this config's default is null, you must either set a default Serde by using this configuration or pass in Serdes explicitly, as described below.

import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index 08005368798..d3a844aa95a 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -74,8 +74,8 @@
                 
  • Kafka Streams DSL for Scala
  • @@ -254,11 +254,11 @@ KStream<String, Long> wordCounts = builder.stream( Serdes.String(), /* key serde */ Serdes.Long() /* value serde */ ); -

    If you do not specify SerDes explicitly, the default SerDes from the +

    If you do not specify Serdes explicitly, the default Serdes from the configuration are used.

    -

    You must specify SerDes explicitly if the key or value types of the records in the Kafka input - topics do not match the configured default SerDes. For information about configuring default SerDes, available - SerDes, and implementing your own custom SerDes see Data Types and Serialization.

    +

    You must specify Serdes explicitly if the key or value types of the records in the Kafka input + topics do not match the configured default Serdes. For information about configuring default Serdes, available + Serdes, and implementing your own custom Serdes see Data Types and Serialization.

    Several variants of stream exist. For example, you can specify a regex pattern for input topics to read from (note that all matching topics will be part of the same input topic group, and the work will not be parallelized for different topics if subscribed to in this way).

    @@ -278,11 +278,11 @@ KStream<String, Long> wordCounts = builder.stream( state store that backs the table). This is required for supporting interactive queries against the table. When a name is not provided the table will not be queryable and an internal name will be provided for the state store.

    -

    If you do not specify SerDes explicitly, the default SerDes from the +

    If you do not specify Serdes explicitly, the default Serdes from the configuration are used.

    -

    You must specify SerDes explicitly if the key or value types of the records in the Kafka input - topics do not match the configured default SerDes. For information about configuring default SerDes, available - SerDes, and implementing your own custom SerDes see Data Types and Serialization.

    +

    You must specify Serdes explicitly if the key or value types of the records in the Kafka input + topics do not match the configured default Serdes. For information about configuring default Serdes, available + Serdes, and implementing your own custom Serdes see Data Types and Serialization.

    Several variants of table exist, for example to specify the auto.offset.reset policy to be used when reading from the input topic.

    @@ -315,10 +315,10 @@ GlobalKTable<String, Long> wordCounts = builder.globalTable( .withKeySerde(Serdes.String()) /* key serde */ .withValueSerde(Serdes.Long()) /* value serde */ ); -

    You must specify SerDes explicitly if the key or value types of the records in the Kafka input - topics do not match the configured default SerDes. For information about configuring default SerDes, available - SerDes, and implementing your own custom SerDes see Data Types and Serialization.

    -

    Several variants of globalTable exist to e.g. specify explicit SerDes.

    +

    You must specify Serdes explicitly if the key or value types of the records in the Kafka input + topics do not match the configured default Serdes. For information about configuring default Serdes, available + Serdes, and implementing your own custom Serdes see Data Types and Serialization.

    +

    Several variants of globalTable exist to e.g. specify explicit Serdes.

    @@ -518,10 +518,10 @@ stream.foreach( (details)

    Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly partitioned (“keyed”) for subsequent operations.

    -

    When to set explicit SerDes: - Variants of groupByKey exist to override the configured default SerDes of your application, which you +

    When to set explicit Serdes: + Variants of groupByKey exist to override the configured default Serdes of your application, which you must do if the key and/or value types of the resulting KGroupedStream do not match the configured default - SerDes.

    + Serdes.

    Note

    Grouping vs. Windowing: @@ -562,10 +562,10 @@ KGroupedStream<byte[], String> groupedStream = stream.groupByKey( KTable details)

    Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly partitioned (“keyed”) for subsequent operations.

    -

    When to set explicit SerDes: - Variants of groupBy exist to override the configured default SerDes of your application, which you must +

    When to set explicit Serdes: + Variants of groupBy exist to override the configured default Serdes of your application, which you must do if the key and/or value types of the resulting KGroupedStream or KGroupedTable do not match the - configured default SerDes.

    + configured default Serdes.

    Note

    Grouping vs. Windowing: @@ -3566,12 +3566,12 @@ groupedTable (KStream details)

    When to provide serdes explicitly:

    A variant of to exists that enables you to specify how the data is produced by using a Produced instance to specify, for example, a StreamPartitioner that gives you control over @@ -3621,7 +3621,7 @@ stream.to("my-stream-output-topic", Produced.with(Serdes.String(), Ser

    1. Additional type annotations - The Java APIs use Java generics in a way that are not fully compatible with the type inferencer of the Scala compiler. Hence the user has to add type annotations to the Scala code, which seems rather non-idiomatic in Scala.
    2. Verbosity - In some cases the Java APIs appear too verbose compared to idiomatic Scala.
    3. -
    4. Type Unsafety - The Java APIs offer some options where the compile time type safety is sometimes subverted and can result in runtime errors. This stems from the fact that the SerDes defined as part of config are not type checked during compile time. Hence any missing SerDes can result in runtime errors.
    5. +
    6. Type Unsafety - The Java APIs offer some options where the compile time type safety is sometimes subverted and can result in runtime errors. This stems from the fact that the Serdes defined as part of config are not type checked during compile time. Hence any missing Serdes can result in runtime errors.

    The Kafka Streams DSL for Scala library is a wrapper over the existing Java APIs for Kafka Streams DSL that addresses the concerns raised above. It does not attempt to provide idiomatic Scala APIs that one would implement in a Scala library developed from scratch. The intention is to make the Java APIs more usable in Scala through better type inferencing, enhanced expressiveness, and lesser boilerplates. @@ -3648,7 +3648,7 @@ stream.to("my-stream-output-topic", Produced.with(Serdes.String(), Ser

    The library also has several utility abstractions and modules that the user needs to use for proper semantics.

    The library is cross-built with Scala 2.12 and 2.13. To reference the library compiled against Scala {{scalaVersion}} include the following in your maven pom.xml add the following:

    <dependency>
    @@ -3698,15 +3698,15 @@ object WordCountApplication extends App {
          streams.close(Duration.ofSeconds(10))
       }
     }
    -

    In the above code snippet, we don't have to provide any SerDes, Grouped, Produced, Consumed or Joined explicitly. They will also not be dependent on any SerDes specified in the config. In fact all SerDes specified in the config will be ignored by the Scala APIs. All SerDes and Grouped, Produced, Consumed or Joined will be handled through implicit SerDes as discussed later in the Implicit SerDes section. The complete independence from configuration based SerDes is what makes this library completely typesafe. Any missing instances of SerDes, Grouped, Produced, Consumed or Joined will be flagged as a compile time error.

    +

    In the above code snippet, we don't have to provide any Serdes, Grouped, Produced, Consumed or Joined explicitly. They will also not be dependent on any Serdes specified in the config. In fact all Serdes specified in the config will be ignored by the Scala APIs. All Serdes and Grouped, Produced, Consumed or Joined will be handled through implicit Serdes as discussed later in the Implicit Serdes section. The complete independence from configuration based Serdes is what makes this library completely typesafe. Any missing instances of Serdes, Grouped, Produced, Consumed or Joined will be flagged as a compile time error.

    -

    Implicit SerDes

    -

    One of the common complaints of Scala users with the Java API has been the repetitive usage of the SerDes in API invocations. Many of the APIs need to take the SerDes through abstractions like Grouped, Produced, Repartitioned, Consumed or Joined. And the user has to supply them every time through the with function of these classes.

    -

    The library uses the power of Scala implicit parameters to alleviate this concern. As a user you can provide implicit SerDes or implicit values of Grouped, Produced, Repartitioned, Consumed or Joined once and make your code less verbose. In fact you can just have the implicit SerDes in scope and the library will make the instances of Grouped, Produced, Consumed or Joined available in scope.

    -

    The library also bundles all implicit SerDes of the commonly used primitive types in a Scala module - so just import the module vals and have all SerDes in scope. A similar strategy of modular implicits can be adopted for any user-defined SerDes as well (User-defined SerDes are discussed in the next section).

    +

    Implicit Serdes

    +

    One of the common complaints of Scala users with the Java API has been the repetitive usage of the Serdes in API invocations. Many of the APIs need to take the Serdes through abstractions like Grouped, Produced, Repartitioned, Consumed or Joined. And the user has to supply them every time through the with function of these classes.

    +

    The library uses the power of Scala implicit parameters to alleviate this concern. As a user you can provide implicit Serdes or implicit values of Grouped, Produced, Repartitioned, Consumed or Joined once and make your code less verbose. In fact you can just have the implicit Serdes in scope and the library will make the instances of Grouped, Produced, Consumed or Joined available in scope.

    +

    The library also bundles all implicit Serdes of the commonly used primitive types in a Scala module - so just import the module vals and have all Serdes in scope. A similar strategy of modular implicits can be adopted for any user-defined Serdes as well (User-defined Serdes are discussed in the next section).

    Here's an example:

    -
    // DefaultSerdes brings into scope implicit SerDes (mostly for primitives)
    +              
    // DefaultSerdes brings into scope implicit Serdes (mostly for primitives)
     // that will set up all Grouped, Produced, Consumed and Joined instances.
     // So all APIs below that accept Grouped, Produced, Consumed or Joined will
     // get these instances automatically
    @@ -3720,7 +3720,7 @@ val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic)
     
     // The following code fragment does not have a single instance of Grouped,
     // Produced, Consumed or Joined supplied explicitly.
    -// All of them are taken care of by the implicit SerDes imported by DefaultSerdes
    +// All of them are taken care of by the implicit Serdes imported by DefaultSerdes
     val clicksPerRegion: KTable[String, Long] =
       userClicksStream
         .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
    @@ -3731,15 +3731,15 @@ val clicksPerRegion: KTable[String, Long] =
     clicksPerRegion.toStream.to(outputTopic)

    Quite a few things are going on in the above code snippet that may warrant a few lines of elaboration:

      -
    1. The code snippet does not depend on any config defined SerDes. In fact any SerDes defined as part of the config will be ignored.
    2. -
    3. All SerDes are picked up from the implicits in scope. And import Serdes._ brings all necessary SerDes in scope.
    4. +
    5. The code snippet does not depend on any config defined Serdes. In fact any Serdes defined as part of the config will be ignored.
    6. +
    7. All Serdes are picked up from the implicits in scope. And import Serdes._ brings all necessary Serdes in scope.
    8. This is an example of compile time type safety that we don't have in the Java APIs.
    9. The code looks less verbose and more focused towards the actual transformation that it does on the data stream.
    -

    User-Defined SerDes

    -

    When the default primitive SerDes are not enough and we need to define custom SerDes, the usage is exactly the same as above. Just define the implicit SerDes and start building the stream transformation. Here's an example with AvroSerde:

    +

    User-Defined Serdes

    +

    When the default primitive Serdes are not enough and we need to define custom Serdes, the usage is exactly the same as above. Just define the implicit Serdes and start building the stream transformation. Here's an example with AvroSerde:

    // domain object as a case class
     case class UserClicks(clicks: Long)
     
    @@ -3747,7 +3747,7 @@ case class UserClicks(clicks: Long)
     // serialize as avro
     implicit val userClicksSerde: Serde[UserClicks] = new AvroSerde
     
    -// Primitive SerDes
    +// Primitive Serdes
     import Serdes._
     
     // And then business as usual ..
    @@ -3772,7 +3772,7 @@ val clicksPerRegion: KTable[String, Long] =
     
     // Write the (continuously updating) results to the output topic.
     clicksPerRegion.toStream.to(outputTopic)
    -

    A complete example of user-defined SerDes can be found in a test class within the library.

    +

    A complete example of user-defined Serdes can be found in a test class within the library.

    diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index eb3b125374d..6a627319e4e 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -777,7 +777,7 @@ Kafka Streams DSL for Scala is a new Kafka Streams client library available for developers authoring Kafka Streams applications in Scala. It wraps core Kafka Streams DSL types to make it easier to call when interoperating with Scala code. For example, it includes higher order functions as parameters for transformations avoiding the need anonymous classes in Java 7 or experimental SAM type conversions in Scala 2.11, automatic conversion between Java and Scala collection types, a way - to implicitly provide SerDes to reduce boilerplate from your application and make it more typesafe, and more! For more information see the + to implicitly provide Serdes to reduce boilerplate from your application and make it more typesafe, and more! For more information see the Kafka Streams DSL for Scala documentation and KIP-270.