diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index 8289423d3d7..8c45d0c7976 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -72,9 +72,9 @@ settings.put(... , ...);
86400000
null
Serde
interface.null
---- Serde for the inner class of a windowed record. Must implement the org.apache.kafka.common.serialization.Serde interface. -
-- Note that this config is only used by plain consumer/producer clients that set a windowed de/serializer type via configs. For Kafka Streams applications that deal with windowed types, you must pass in the inner serde type when you instantiate the windowed serde object for your topology. -
-
diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html index afa6397c5ef..2bc2d7d5d0e 100644 --- a/docs/streams/developer-guide/datatypes.html +++ b/docs/streams/developer-guide/datatypes.html @@ -48,9 +48,10 @@-
- Primitive and basic types
- JSON
-- Implementing custom serdes
+- Window Serdes
+- Implementing custom serdes
Kafka Streams DSL for Scala Implicit Serdes +Kafka Streams DSL for Scala Implicit Serdes Configuring Serdes
@@ -103,7 +104,7 @@ userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.L<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> - <version>2.8.0</version> + <version>{{fullDotVersion}}</version> </dependency>
This artifact provides the following serde implementations under the package org.apache.kafka.common.serialization, which you can leverage when e.g., defining default serializers in your Streams configuration.
@@ -163,6 +164,76 @@ userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.L
As shown in the example, you can use JSONSerdes inner classes
+Serdes.serdeFrom(<serializerInstance>, <deserializerInstance>)
to construct JSON compatible serializers and deserializers.+Window Serdes
+Apache Kafka Streams includes serde implementations for windowed types in + its
+kafka-streams
Maven artifact:+<dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-streams</artifactId> + <version>{{fullDotVersion}}</version> +</dependency>
This artifact provides the following windowed serde implementations under the package org.apache.kafka.streams.kstream:
+ +Serdes:
++
+ +- +
WindowedSerdes.TimeWindowedSerde<T>
- +
WindowedSerdes.SessionWindowedSerde<T>
Serializers:
++
+ +- +
TimeWindowedSerializer<T>
- +
SessionWindowedSerializer<T>
Deserializers:
++
+- +
TimeWindowedDeserializer<T>
- +
SessionWindowedDeserializer<T>
Usage in Code
+When using windowed serdes in your application code, you typically create instances via constructors or factory methods:
++ +// Time windowed serde - using factory method +Serde<Windowed<String>> timeWindowedSerde = + WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L); + +// Time windowed serde - using constructor +Serde<Windowed<String>> timeWindowedSerde2 = + new WindowedSerdes.TimeWindowedSerde<>(Serdes.String(), 500L); + +// Session windowed serde - using factory method +Serde<Windowed<String>> sessionWindowedSerde = + WindowedSerdes.sessionWindowedSerdeFrom(String.class); + +// Session windowed serde - using constructor +Serde<Windowed<String>> sessionWindowedSerde2 = + new WindowedSerdes.SessionWindowedSerde<>(Serdes.String()); + +// Using individual serializers/deserializers +TimeWindowedSerializer<String> serializer = new TimeWindowedSerializer<>(Serdes.String().serializer()); +TimeWindowedDeserializer<String> deserializer = new TimeWindowedDeserializer<>(Serdes.String().deserializer(), 500L);
Usage in Command Line
+When using command-line tools (like
+bin/kafka-console-consumer.sh
), you can configure windowed deserializers by passing the inner class and window size via configuration properties. The property names use a prefix pattern:+ +# Time windowed deserializer configuration +--property print.key=true \ +--property key.deserializer=org.apache.kafka.streams.kstream.TimeWindowedDeserializer \ +--property key.deserializer.windowed.inner.deserializer.class=org.apache.kafka.common.serialization.StringDeserializer \ +--property key.deserializer.window.size.ms=500 + +# Session windowed deserializer configuration +--property print.key=true \ +--property key.deserializer=org.apache.kafka.streams.kstream.SessionWindowedDeserializer \ +--property key.deserializer.windowed.inner.deserializer.class=org.apache.kafka.common.serialization.StringDeserializer
Deprecated Configs
+The following
+StreamsConfig
parameters are deprecated in favor of passing parameters directly to serializer/deserializer constructors:+
+- +
StreamsConfig.WINDOWED_INNER_CLASS_SERDE
is deprecated in favor ofTimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS
andTimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS
- +
StreamsConfig.WINDOW_SIZE_MS_CONFIG
is deprecated in favor ofTimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG