KAFKA-9559 add docs for changing default serde to null (#10988)

#10813 changed the default serde from ByteArraySerde as discussed in KIP-741. This adds proper documentation so users know to set a serde through the configs or explicitly pass one in.

Reviewers: Walker Carlson <wcarlson@confluent.io>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
leah 2021-07-08 13:33:22 -05:00 committed by GitHub
parent 66e8b8b413
commit d6918e427e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 25 deletions

View File

@ -211,8 +211,9 @@ settings.put(... , ...);</code></pre>
</tr> </tr>
<tr class="row-even"><td>default.key.serde</td> <tr class="row-even"><td>default.key.serde</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">Default serializer/deserializer class for record keys, implements the <code class="docutils literal"><span class="pre">Serde</span></code> interface (see also default.value.serde).</td> <td colspan="2">Default serializer/deserializer class for record keys, implements the <code class="docutils literal"><span class="pre">Serde</span></code> interface. Must be
<td><code class="docutils literal"><span class="pre">Serdes.ByteArray().getClass().getName()</span></code></td> set by the user or all serdes must be passed in explicitly (see also default.value.serde).</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr> </tr>
<tr class="row-odd"><td>default.production.exception.handler</td> <tr class="row-odd"><td>default.production.exception.handler</td>
<td>Medium</td> <td>Medium</td>
@ -226,8 +227,9 @@ settings.put(... , ...);</code></pre>
</tr> </tr>
<tr class="row-odd"><td>default.value.serde</td> <tr class="row-odd"><td>default.value.serde</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">Default serializer/deserializer class for record values, implements the <code class="docutils literal"><span class="pre">Serde</span></code> interface (see also default.key.serde).</td> <td colspan="2">Default serializer/deserializer class for record values, implements the <code class="docutils literal"><span class="pre">Serde</span></code> interface. Must be
<td><code class="docutils literal"><span class="pre">Serdes.ByteArray().getClass().getName()</span></code></td> set by the user or all serdes must be passed in explicitly (see also default.key.serde).</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr> </tr>
<tr class="row-even"><td>default.windowed.key.serde.inner</td> <tr class="row-even"><td>default.windowed.key.serde.inner</td>
<td>Medium</td> <td>Medium</td>
@ -549,7 +551,7 @@ streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
<div class="section" id="default-key-serde"> <div class="section" id="default-key-serde">
<h4><a class="toc-backref" href="#id8">default.key.serde</a><a class="headerlink" href="#default-key-serde" title="Permalink to this headline"></a></h4> <h4><a class="toc-backref" href="#id8">default.key.serde</a><a class="headerlink" href="#default-key-serde" title="Permalink to this headline"></a></h4>
<blockquote> <blockquote>
<div><p>The default Serializer/Deserializer class for record keys. Serialization and deserialization in Kafka Streams happens <div><p>The default Serializer/Deserializer class for record keys, null unless set by user. Serialization and deserialization in Kafka Streams happens
whenever data needs to be materialized, for example:</p> whenever data needs to be materialized, for example:</p>
<div><ul class="simple"> <div><ul class="simple">
<li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <code class="docutils literal"><span class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils literal"><span class="pre">KStream#to()</span></code> methods).</li> <li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <code class="docutils literal"><span class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils literal"><span class="pre">KStream#to()</span></code> methods).</li>
@ -562,7 +564,7 @@ streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
<div class="section" id="default-value-serde"> <div class="section" id="default-value-serde">
<h4><a class="toc-backref" href="#id9">default.value.serde</a><a class="headerlink" href="#default-value-serde" title="Permalink to this headline"></a></h4> <h4><a class="toc-backref" href="#id9">default.value.serde</a><a class="headerlink" href="#default-value-serde" title="Permalink to this headline"></a></h4>
<blockquote> <blockquote>
<div><p>The default Serializer/Deserializer class for record values. Serialization and deserialization in Kafka Streams <div><p>The default Serializer/Deserializer class for record values, null unless set by user. Serialization and deserialization in Kafka Streams
happens whenever data needs to be materialized, for example:</p> happens whenever data needs to be materialized, for example:</p>
<ul class="simple"> <ul class="simple">
<li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <code class="docutils literal"><span class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils literal"><span class="pre">KStream#to()</span></code> methods).</li> <li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <code class="docutils literal"><span class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils literal"><span class="pre">KStream#to()</span></code> methods).</li>

View File

@ -33,28 +33,30 @@
<div class="section" id="data-types-and-serialization"> <div class="section" id="data-types-and-serialization">
<span id="streams-developer-guide-serdes"></span><h1>Data Types and Serialization<a class="headerlink" href="#data-types-and-serialization" title="Permalink to this headline"></a></h1> <span id="streams-developer-guide-serdes"></span><h1>Data Types and Serialization<a class="headerlink" href="#data-types-and-serialization" title="Permalink to this headline"></a></h1>
<p>Every Kafka Streams application must provide SerDes (Serializer/Deserializer) for the data types of record keys and record values (e.g. <code class="docutils literal"><span class="pre">java.lang.String</span></code>) to materialize the data when necessary. Operations that require such SerDes information include: <code class="docutils literal"><span class="pre">stream()</span></code>, <code class="docutils literal"><span class="pre">table()</span></code>, <code class="docutils literal"><span class="pre">to()</span></code>, <code class="docutils literal"><span class="pre">repartition()</span></code>, <code class="docutils literal"><span class="pre">groupByKey()</span></code>, <code class="docutils literal"><span class="pre">groupBy()</span></code>.</p> <p>Every Kafka Streams application must provide Serdes (Serializer/Deserializer) for the data types of record keys and record values (e.g. <code class="docutils literal"><span class="pre">java.lang.String</span></code>) to materialize the data when necessary. Operations that require such Serdes information include: <code class="docutils literal"><span class="pre">stream()</span></code>, <code class="docutils literal"><span class="pre">table()</span></code>, <code class="docutils literal"><span class="pre">to()</span></code>, <code class="docutils literal"><span class="pre">repartition()</span></code>, <code class="docutils literal"><span class="pre">groupByKey()</span></code>, <code class="docutils literal"><span class="pre">groupBy()</span></code>.</p>
<p>You can provide SerDes by using either of these methods:</p> <p>You can provide Serdes by using either of these methods, but you must use at least one:</p>
<ul class="simple"> <ul class="simple">
<li>By setting default SerDes in the <code class="docutils literal"><span class="pre">java.util.Properties</span></code> config instance.</li> <li>By setting default Serdes in the <code class="docutils literal"><span class="pre">java.util.Properties</span></code> config instance.</li>
<li>By specifying explicit SerDes when calling the appropriate API methods, thus overriding the defaults.</li> <li>By specifying explicit Serdes when calling the appropriate API methods, thus overriding the defaults.</li>
</ul> </ul>
<p class="topic-title first"><b>Table of Contents</b></p> <p class="topic-title first"><b>Table of Contents</b></p>
<ul class="simple"> <ul class="simple">
<li><a class="reference internal" href="#configuring-serdes" id="id1">Configuring SerDes</a></li> <li><a class="reference internal" href="#configuring-serdes" id="id1">Configuring Serdes</a></li>
<li><a class="reference internal" href="#overriding-default-serdes" id="id2">Overriding default SerDes</a></li> <li><a class="reference internal" href="#overriding-default-serdes" id="id2">Overriding default Serdes</a></li>
<li><a class="reference internal" href="#available-serdes" id="id3">Available SerDes</a></li> <li><a class="reference internal" href="#available-serdes" id="id3">Available Serdes</a></li>
<ul> <ul>
<li><a class="reference internal" href="#primitive-and-basic-types" id="id4">Primitive and basic types</a></li> <li><a class="reference internal" href="#primitive-and-basic-types" id="id4">Primitive and basic types</a></li>
<li><a class="reference internal" href="#json" id="id6">JSON</a></li> <li><a class="reference internal" href="#json" id="id6">JSON</a></li>
<li><a class="reference internal" href="#implementing-custom-serdes" id="id5">Implementing custom serdes</a></li> <li><a class="reference internal" href="#implementing-custom-serdes" id="id5">Implementing custom serdes</a></li>
</ul> </ul>
<li><a class="reference internal" href="#scala-dsl-serdes" id="id8">Kafka Streams DSL for Scala Implicit SerDes</a></li> <li><a class="reference internal" href="#scala-dsl-serdes" id="id8">Kafka Streams DSL for Scala Implicit Serdes</a></li>
</ul> </ul>
<div class="section" id="configuring-serdes"> <div class="section" id="configuring-serdes">
<h2>Configuring SerDes<a class="headerlink" href="#configuring-serdes" title="Permalink to this headline"></a></h2> <h2>Configuring Serdes<a class="headerlink" href="#configuring-serdes" title="Permalink to this headline"></a></h2>
<p>SerDes specified in the Streams configuration are used as the default in your Kafka Streams application.</p> <p>Serdes specified in the Streams configuration are used as the default in your Kafka Streams application.
Because this config's default is null, you must either set a default SerDe by using this
configuration or pass in Serdes explicitly, as described below.</p>
<pre class="line-numbers"><code class="language-java">import org.apache.kafka.common.serialization.Serdes; <pre class="line-numbers"><code class="language-java">import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
@ -65,8 +67,8 @@ settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getCl
settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());</code></pre> settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());</code></pre>
</div> </div>
<div class="section" id="overriding-default-serdes"> <div class="section" id="overriding-default-serdes">
<h2>Overriding default SerDes<a class="headerlink" href="#overriding-default-serdes" title="Permalink to this headline"></a></h2> <h2>Overriding default Serdes<a class="headerlink" href="#overriding-default-serdes" title="Permalink to this headline"></a></h2>
<p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:</p> <p>You can also specify Serdes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:</p>
<pre class="line-numbers"><code class="language-java">import org.apache.kafka.common.serialization.Serde; <pre class="line-numbers"><code class="language-java">import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
@ -93,7 +95,7 @@ userCountByRegion.to(&quot;RegionCountsTopic&quot;, Produced.valueSerde(Serdes.L
</p> </p>
</div> </div>
<div class="section" id="available-serdes"> <div class="section" id="available-serdes">
<span id="streams-developer-guide-serdes-available"></span><h2>Available SerDes<a class="headerlink" href="#available-serdes" title="Permalink to this headline"></a></h2> <span id="streams-developer-guide-serdes-available"></span><h2>Available Serdes<a class="headerlink" href="#available-serdes" title="Permalink to this headline"></a></h2>
<div class="section" id="primitive-and-basic-types"> <div class="section" id="primitive-and-basic-types">
<h3>Primitive and basic types<a class="headerlink" href="#primitive-and-basic-types" title="Permalink to this headline"></a></h3> <h3>Primitive and basic types<a class="headerlink" href="#primitive-and-basic-types" title="Permalink to this headline"></a></h3>
<p>Apache Kafka includes several built-in serde implementations for Java primitives and basic types such as <code class="docutils literal"><span class="pre">byte[]</span></code> in <p>Apache Kafka includes several built-in serde implementations for Java primitives and basic types such as <code class="docutils literal"><span class="pre">byte[]</span></code> in
@ -159,9 +161,9 @@ userCountByRegion.to(&quot;RegionCountsTopic&quot;, Produced.valueSerde(Serdes.L
</p> </p>
</div> </div>
<div class="section" id="implementing-custom-serdes"> <div class="section" id="implementing-custom-serdes">
<span id="streams-developer-guide-serdes-custom"></span><h2>Implementing custom SerDes<a class="headerlink" href="#implementing-custom-serdes" title="Permalink to this headline"></a></h2> <span id="streams-developer-guide-serdes-custom"></span><h2>Implementing custom Serdes<a class="headerlink" href="#implementing-custom-serdes" title="Permalink to this headline"></a></h2>
<p>If you need to implement custom SerDes, your best starting point is to take a look at the source code references of <p>If you need to implement custom Serdes, your best starting point is to take a look at the source code references of
existing SerDes (see previous section). Typically, your workflow will be similar to:</p> existing Serdes (see previous section). Typically, your workflow will be similar to:</p>
<ol class="arabic simple"> <ol class="arabic simple">
<li>Write a <em>serializer</em> for your data type <code class="docutils literal"><span class="pre">T</span></code> by implementing <li>Write a <em>serializer</em> for your data type <code class="docutils literal"><span class="pre">T</span></code> by implementing
<a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java">org.apache.kafka.common.serialization.Serializer</a>.</li> <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java">org.apache.kafka.common.serialization.Serializer</a>.</li>
@ -169,7 +171,7 @@ userCountByRegion.to(&quot;RegionCountsTopic&quot;, Produced.valueSerde(Serdes.L
<a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java">org.apache.kafka.common.serialization.Deserializer</a>.</li> <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java">org.apache.kafka.common.serialization.Deserializer</a>.</li>
<li>Write a <em>serde</em> for <code class="docutils literal"><span class="pre">T</span></code> by implementing <li>Write a <em>serde</em> for <code class="docutils literal"><span class="pre">T</span></code> by implementing
<a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java">org.apache.kafka.common.serialization.Serde</a>, <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java">org.apache.kafka.common.serialization.Serde</a>,
which you either do manually (see existing SerDes in the previous section) or by leveraging helper functions in which you either do manually (see existing Serdes in the previous section) or by leveraging helper functions in
<a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java">Serdes</a> <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java">Serdes</a>
such as <code class="docutils literal"><span class="pre">Serdes.serdeFrom(Serializer&lt;T&gt;, Deserializer&lt;T&gt;)</span></code>. such as <code class="docutils literal"><span class="pre">Serdes.serdeFrom(Serializer&lt;T&gt;, Deserializer&lt;T&gt;)</span></code>.
Note that you will need to implement your own class (that has no generic types) if you want to use your custom serde in the configuration provided to <code class="docutils literal"><span class="pre">KafkaStreams</span></code>. Note that you will need to implement your own class (that has no generic types) if you want to use your custom serde in the configuration provided to <code class="docutils literal"><span class="pre">KafkaStreams</span></code>.
@ -179,8 +181,8 @@ userCountByRegion.to(&quot;RegionCountsTopic&quot;, Produced.valueSerde(Serdes.L
</div> </div>
</div> </div>
<div class="section" id="scala-dsl-serdes"> <div class="section" id="scala-dsl-serdes">
<h2>Kafka Streams DSL for Scala Implicit SerDes<a class="headerlink" href="scala-dsl-serdes" title="Permalink to this headline"></a></h2> <h2>Kafka Streams DSL for Scala Implicit Serdes<a class="headerlink" href="scala-dsl-serdes" title="Permalink to this headline"></a></h2>
<p>When using the <a href="dsl-api.html#scala-dsl">Kafka Streams DSL for Scala</a> you're not required to configure a default SerDes. In fact, it's not supported. SerDes are instead provided implicitly by default implementations for common primitive datatypes. See the <a href="dsl-api.html#scala-dsl-implicit-serdes">Implicit SerDes</a> and <a href="dsl-api.html#scala-dsl-user-defined-serdes">User-Defined SerDes</a> sections in the DSL API documentation for details</p> <p>When using the <a href="dsl-api.html#scala-dsl">Kafka Streams DSL for Scala</a> you're not required to configure a default Serdes. In fact, it's not supported. Serdes are instead provided implicitly by default implementations for common primitive datatypes. See the <a href="dsl-api.html#scala-dsl-implicit-serdes">Implicit Serdes</a> and <a href="dsl-api.html#scala-dsl-user-defined-serdes">User-Defined Serdes</a> sections in the DSL API documentation for details</p>
</div> </div>

View File

@ -232,6 +232,11 @@
Kafka Streams throws a <code>TimeoutException</code> Kafka Streams throws a <code>TimeoutException</code>
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams">KIP-572</a>). (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams">KIP-572</a>).
</p> </p>
<p>
We changed the default value of <code>default.key.serde</code> and <code>default.value.serde</code> to be <code>null</code> instead of <code>ByteArraySerde</code>.
Users will now see a <code>ConfigException</code> if their serdes are not correctly configured through those configs or passed in explicitly.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-741%3A+Change+default+serde+to+be+null">KIP-741</a> for more details.
</p>
<h3><a id="streams_api_changes_270" href="#streams_api_changes_270">Streams API changes in 2.7.0</a></h3> <h3><a id="streams_api_changes_270" href="#streams_api_changes_270">Streams API changes in 2.7.0</a></h3>
<p> <p>