mirror of https://github.com/apache/kafka.git
KAFKA-15307: Kafka Streams configuration docs outdated (#20408)
Added new section for window serdes and update Streams configuration page accordingly. Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
parent
aa3a64e92c
commit
639492cc0a
|
@ -72,9 +72,9 @@ settings.put(... , ...);</code></pre>
|
||||||
<li><a class="reference internal" href="#optional-configuration-parameters" id="id6">Optional configuration parameters</a>
|
<li><a class="reference internal" href="#optional-configuration-parameters" id="id6">Optional configuration parameters</a>
|
||||||
<ul>
|
<ul>
|
||||||
<li><a class="reference internal" href="#acceptable-recovery-lag" id="id27">acceptable.recovery.lag</a></li>
|
<li><a class="reference internal" href="#acceptable-recovery-lag" id="id27">acceptable.recovery.lag</a></li>
|
||||||
<li><a class="reference internal" href="#deserialization-exception-handler" id="id7">default.deserialization.exception.handler (deprecated since 4.0)</a></li>
|
<li><a class="reference internal" href="#deserialization-exception-handler" id="id7">default.deserialization.exception.handler (deprecated)</a></li>
|
||||||
<li><a class="reference internal" href="#default-key-serde" id="id8">default.key.serde</a></li>
|
<li><a class="reference internal" href="#default-key-serde" id="id8">default.key.serde</a></li>
|
||||||
<li><a class="reference internal" href="#production-exception-handler" id="id24">default.production.exception.handler (deprecated since 4.0)</a></li>
|
<li><a class="reference internal" href="#production-exception-handler" id="id24">default.production.exception.handler (deprecated)</a></li>
|
||||||
<li><a class="reference internal" href="#timestamp-extractor" id="id15">default.timestamp.extractor</a></li>
|
<li><a class="reference internal" href="#timestamp-extractor" id="id15">default.timestamp.extractor</a></li>
|
||||||
<li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li>
|
<li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li>
|
||||||
<li><a class="reference internal" href="#deserialization-exception-handler" id="id7">deserialization.exception.handler</a></li>
|
<li><a class="reference internal" href="#deserialization-exception-handler" id="id7">deserialization.exception.handler</a></li>
|
||||||
|
@ -100,7 +100,6 @@ settings.put(... , ...);</code></pre>
|
||||||
<li><a class="reference internal" href="#state-dir" id="id14">state.dir</a></li>
|
<li><a class="reference internal" href="#state-dir" id="id14">state.dir</a></li>
|
||||||
<li><a class="reference internal" href="#task-assignor-class" id="id39">task.assignor.class</a></li>
|
<li><a class="reference internal" href="#task-assignor-class" id="id39">task.assignor.class</a></li>
|
||||||
<li><a class="reference internal" href="#topology-optimization" id="id31">topology.optimization</a></li>
|
<li><a class="reference internal" href="#topology-optimization" id="id31">topology.optimization</a></li>
|
||||||
<li><a class="reference internal" href="#windowed-inner-class-serde" id="id38">windowed.inner.class.serde</a></li>
|
|
||||||
</ul>
|
</ul>
|
||||||
</li>
|
</li>
|
||||||
<li><a class="reference internal" href="#kafka-consumers-and-producer-configuration-parameters" id="id16">Kafka consumers and producer configuration parameters</a>
|
<li><a class="reference internal" href="#kafka-consumers-and-producer-configuration-parameters" id="id16">Kafka consumers and producer configuration parameters</a>
|
||||||
|
@ -543,11 +542,16 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
|
||||||
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
|
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
|
||||||
<td><code class="docutils literal"><span class="pre">86400000</span></code></td> (1 day)</td>
|
<td><code class="docutils literal"><span class="pre">86400000</span></code></td> (1 day)</td>
|
||||||
</tr>
|
</tr>
|
||||||
<tr class="row-odd"><td>window.size.ms</td>
|
<tr class="row-odd"><td>window.size.ms (Deprecated. See <a href="datatypes.html#window-serdes">Window Serdes</a> for alternatives.)</td>
|
||||||
<td>Low</td>
|
<td>Low</td>
|
||||||
<td colspan="2">Sets window size for the deserializer in order to calculate window end times.</td>
|
<td colspan="2">Sets window size for the deserializer in order to calculate window end times.</td>
|
||||||
<td><code class="docutils literal"><span class="pre">null</span></code></td>
|
<td><code class="docutils literal"><span class="pre">null</span></code></td>
|
||||||
</tr>
|
</tr>
|
||||||
|
<tr class="row-even"><td>windowed.inner.class.serde (Deprecated. See <a href="datatypes.html#window-serdes">Window Serdes</a> for alternatives.)</td>
|
||||||
|
<td>Low</td>
|
||||||
|
<td colspan="2">Serde for the inner class of a windowed record. Must implement the <code class="docutils literal"><span class="pre">Serde</span></code> interface.</td>
|
||||||
|
<td><code class="docutils literal"><span class="pre">null</span></code></td>
|
||||||
|
</tr>
|
||||||
</tbody>
|
</tbody>
|
||||||
</table>
|
</table>
|
||||||
<div class="section" id="acceptable-recovery-lag">
|
<div class="section" id="acceptable-recovery-lag">
|
||||||
|
@ -1224,18 +1228,6 @@ streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksD
|
||||||
</p>
|
</p>
|
||||||
</div></blockquote>
|
</div></blockquote>
|
||||||
</div>
|
</div>
|
||||||
<div class="section" id="windowed.inner.class.serde">
|
|
||||||
<h4><a class="toc-backref" href="#id31">windowed.inner.class.serde</a><a class="headerlink" href="#windowed.inner.class.serde" title="Permalink to this headline"></a></h4>
|
|
||||||
<blockquote>
|
|
||||||
<div>
|
|
||||||
<p>
|
|
||||||
Serde for the inner class of a windowed record. Must implement the org.apache.kafka.common.serialization.Serde interface.
|
|
||||||
</p>
|
|
||||||
<p>
|
|
||||||
Note that this config is only used by plain consumer/producer clients that set a windowed de/serializer type via configs. For Kafka Streams applications that deal with windowed types, you must pass in the inner serde type when you instantiate the windowed serde object for your topology.
|
|
||||||
</p>
|
|
||||||
</div></blockquote>
|
|
||||||
</div>
|
|
||||||
<div class="section" id="upgrade-from">
|
<div class="section" id="upgrade-from">
|
||||||
<span id="streams-developer-guide-upgrade-from"></span><h4><a class="toc-backref" href="#id14">upgrade.from</a><a class="headerlink" href="#upgrade-from" title="Permalink to this headline"></a></h4>
|
<span id="streams-developer-guide-upgrade-from"></span><h4><a class="toc-backref" href="#id14">upgrade.from</a><a class="headerlink" href="#upgrade-from" title="Permalink to this headline"></a></h4>
|
||||||
<blockquote>
|
<blockquote>
|
||||||
|
|
|
@ -48,9 +48,10 @@
|
||||||
<ul>
|
<ul>
|
||||||
<li><a class="reference internal" href="#primitive-and-basic-types" id="id4">Primitive and basic types</a></li>
|
<li><a class="reference internal" href="#primitive-and-basic-types" id="id4">Primitive and basic types</a></li>
|
||||||
<li><a class="reference internal" href="#json" id="id6">JSON</a></li>
|
<li><a class="reference internal" href="#json" id="id6">JSON</a></li>
|
||||||
<li><a class="reference internal" href="#implementing-custom-serdes" id="id5">Implementing custom serdes</a></li>
|
<li><a class="reference internal" href="#window-serdes" id="id7">Window Serdes</a></li>
|
||||||
|
<li><a class="reference internal" href="#implementing-custom-serdes" id="id8">Implementing custom serdes</a></li>
|
||||||
</ul>
|
</ul>
|
||||||
<li><a class="reference internal" href="#scala-dsl-serdes" id="id8">Kafka Streams DSL for Scala Implicit Serdes</a></li>
|
<li><a class="reference internal" href="#scala-dsl-serdes" id="id9">Kafka Streams DSL for Scala Implicit Serdes</a></li>
|
||||||
</ul>
|
</ul>
|
||||||
<div class="section" id="configuring-serdes">
|
<div class="section" id="configuring-serdes">
|
||||||
<h2>Configuring Serdes<a class="headerlink" href="#configuring-serdes" title="Permalink to this headline"></a></h2>
|
<h2>Configuring Serdes<a class="headerlink" href="#configuring-serdes" title="Permalink to this headline"></a></h2>
|
||||||
|
@ -103,7 +104,7 @@ userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.L
|
||||||
<pre class="line-numbers"><code class="language-xml"><dependency>
|
<pre class="line-numbers"><code class="language-xml"><dependency>
|
||||||
<groupId>org.apache.kafka</groupId>
|
<groupId>org.apache.kafka</groupId>
|
||||||
<artifactId>kafka-clients</artifactId>
|
<artifactId>kafka-clients</artifactId>
|
||||||
<version>2.8.0</version>
|
<version>{{fullDotVersion}}</version>
|
||||||
</dependency></code></pre>
|
</dependency></code></pre>
|
||||||
<p>This artifact provides the following serde implementations under the package <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization">org.apache.kafka.common.serialization</a>, which you can leverage when e.g., defining default serializers in your Streams configuration.</p>
|
<p>This artifact provides the following serde implementations under the package <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization">org.apache.kafka.common.serialization</a>, which you can leverage when e.g., defining default serializers in your Streams configuration.</p>
|
||||||
<table border="1" class="docutils">
|
<table border="1" class="docutils">
|
||||||
|
@ -163,6 +164,76 @@ userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.L
|
||||||
<p>As shown in the example, you can use JSONSerdes inner classes <code class="docutils literal"><span class="pre">Serdes.serdeFrom(<serializerInstance>, <deserializerInstance>)</span></code> to construct JSON compatible serializers and deserializers.
|
<p>As shown in the example, you can use JSONSerdes inner classes <code class="docutils literal"><span class="pre">Serdes.serdeFrom(<serializerInstance>, <deserializerInstance>)</span></code> to construct JSON compatible serializers and deserializers.
|
||||||
</p>
|
</p>
|
||||||
</div>
|
</div>
|
||||||
|
<div class="section" id="window-serdes">
|
||||||
|
<h3>Window Serdes<a class="headerlink" href="#window-serdes" title="Permalink to this headline"></a></h3>
|
||||||
|
<p>Apache Kafka Streams includes serde implementations for windowed types in
|
||||||
|
its <code class="docutils literal"><span class="pre">kafka-streams</span></code> Maven artifact:</p>
|
||||||
|
<pre class="line-numbers"><code class="language-xml"><dependency>
|
||||||
|
<groupId>org.apache.kafka</groupId>
|
||||||
|
<artifactId>kafka-streams</artifactId>
|
||||||
|
<version>{{fullDotVersion}}</version>
|
||||||
|
</dependency></code></pre>
|
||||||
|
<p>This artifact provides the following windowed serde implementations under the package <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/src/main/java/org/apache/kafka/streams/kstream">org.apache.kafka.streams.kstream</a>:</p>
|
||||||
|
|
||||||
|
<p><strong>Serdes:</strong></p>
|
||||||
|
<ul class="simple">
|
||||||
|
<li><code class="docutils literal"><span class="pre">WindowedSerdes.TimeWindowedSerde<T></span></code></li>
|
||||||
|
<li><code class="docutils literal"><span class="pre">WindowedSerdes.SessionWindowedSerde<T></span></code></li>
|
||||||
|
</ul>
|
||||||
|
|
||||||
|
<p><strong>Serializers:</strong></p>
|
||||||
|
<ul class="simple">
|
||||||
|
<li><code class="docutils literal"><span class="pre">TimeWindowedSerializer<T></span></code></li>
|
||||||
|
<li><code class="docutils literal"><span class="pre">SessionWindowedSerializer<T></span></code></li>
|
||||||
|
</ul>
|
||||||
|
|
||||||
|
<p><strong>Deserializers:</strong></p>
|
||||||
|
<ul class="simple">
|
||||||
|
<li><code class="docutils literal"><span class="pre">TimeWindowedDeserializer<T></span></code></li>
|
||||||
|
<li><code class="docutils literal"><span class="pre">SessionWindowedDeserializer<T></span></code></li>
|
||||||
|
</ul>
|
||||||
|
<h4>Usage in Code</h4>
|
||||||
|
<p>When using windowed serdes in your application code, you typically create instances via constructors or factory methods:</p>
|
||||||
|
<pre class="line-numbers"><code class="language-java">// Time windowed serde - using factory method
|
||||||
|
Serde<Windowed<String>> timeWindowedSerde =
|
||||||
|
WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
|
||||||
|
|
||||||
|
// Time windowed serde - using constructor
|
||||||
|
Serde<Windowed<String>> timeWindowedSerde2 =
|
||||||
|
new WindowedSerdes.TimeWindowedSerde<>(Serdes.String(), 500L);
|
||||||
|
|
||||||
|
// Session windowed serde - using factory method
|
||||||
|
Serde<Windowed<String>> sessionWindowedSerde =
|
||||||
|
WindowedSerdes.sessionWindowedSerdeFrom(String.class);
|
||||||
|
|
||||||
|
// Session windowed serde - using constructor
|
||||||
|
Serde<Windowed<String>> sessionWindowedSerde2 =
|
||||||
|
new WindowedSerdes.SessionWindowedSerde<>(Serdes.String());
|
||||||
|
|
||||||
|
// Using individual serializers/deserializers
|
||||||
|
TimeWindowedSerializer<String> serializer = new TimeWindowedSerializer<>(Serdes.String().serializer());
|
||||||
|
TimeWindowedDeserializer<String> deserializer = new TimeWindowedDeserializer<>(Serdes.String().deserializer(), 500L);</code></pre>
|
||||||
|
|
||||||
|
<h4>Usage in Command Line</h4>
|
||||||
|
<p>When using command-line tools (like <code>bin/kafka-console-consumer.sh</code>), you can configure windowed deserializers by passing the inner class and window size via configuration properties. The property names use a prefix pattern:</p>
|
||||||
|
<pre class="line-numbers"><code class="language-bash"># Time windowed deserializer configuration
|
||||||
|
--property print.key=true \
|
||||||
|
--property key.deserializer=org.apache.kafka.streams.kstream.TimeWindowedDeserializer \
|
||||||
|
--property key.deserializer.windowed.inner.deserializer.class=org.apache.kafka.common.serialization.StringDeserializer \
|
||||||
|
--property key.deserializer.window.size.ms=500
|
||||||
|
|
||||||
|
# Session windowed deserializer configuration
|
||||||
|
--property print.key=true \
|
||||||
|
--property key.deserializer=org.apache.kafka.streams.kstream.SessionWindowedDeserializer \
|
||||||
|
--property key.deserializer.windowed.inner.deserializer.class=org.apache.kafka.common.serialization.StringDeserializer</code></pre>
|
||||||
|
|
||||||
|
<h4>Deprecated Configs</h4>
|
||||||
|
<p>The following <code>StreamsConfig</code> parameters are deprecated in favor of passing parameters directly to serializer/deserializer constructors:</p>
|
||||||
|
<ul class="simple">
|
||||||
|
<li><code class="docutils literal"><span class="pre">StreamsConfig.WINDOWED_INNER_CLASS_SERDE</span></code> is deprecated in favor of <code class="docutils literal"><span class="pre">TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS</span></code> and <code class="docutils literal"><span class="pre">TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS</span></code></li>
|
||||||
|
<li><code class="docutils literal"><span class="pre">StreamsConfig.WINDOW_SIZE_MS_CONFIG</span></code> is deprecated in favor of <code class="docutils literal"><span class="pre">TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG</span></code></li>
|
||||||
|
</ul>
|
||||||
|
</div>
|
||||||
<div class="section" id="implementing-custom-serdes">
|
<div class="section" id="implementing-custom-serdes">
|
||||||
<span id="streams-developer-guide-serdes-custom"></span><h2>Implementing custom Serdes<a class="headerlink" href="#implementing-custom-serdes" title="Permalink to this headline"></a></h2>
|
<span id="streams-developer-guide-serdes-custom"></span><h2>Implementing custom Serdes<a class="headerlink" href="#implementing-custom-serdes" title="Permalink to this headline"></a></h2>
|
||||||
<p>If you need to implement custom Serdes, your best starting point is to take a look at the source code references of
|
<p>If you need to implement custom Serdes, your best starting point is to take a look at the source code references of
|
||||||
|
|
Loading…
Reference in New Issue