KAFKA-14491: [21/N] Docs updates for versioned state stores (#13444)

Add docs for KIP-889.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Victoria Xia 2023-04-12 14:31:27 -04:00 committed by GitHub
parent 9c0caca660
commit 7c74f3983b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 134 additions and 10 deletions

View File

@ -852,7 +852,9 @@ KStream&lt;byte[], String&gt; repartitionedStream = stream.repartition(Repartiti
defined window boundary.</p>
<p><b>Note:</b> Following store types are used regardless of the possibly specified type (via the parameter <code class="docutils literal"><span class="pre">materialized</span></code>):
<ul class="simple">
<li>non-windowed aggregations and non-windowed KTables use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStore.html">TimestampedKeyValueStore</a>s</li>
<li>non-windowed aggregations and non-windowed KTables use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStore.html">TimestampedKeyValueStore</a>s
or <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/VersionedKeyValueStore.html">VersionedKeyValueStore</a>s,
depending on whether the parameter <code class="docutils literal"><span class="pre">materialized</span></code> is versioned</li>
<li>time-windowed aggregations and KStream-KStream joins use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html">TimestampedWindowStore</a>s</li>
<li>session windowed aggregations use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/SessionStore.html">SessionStore</a>s (there is no timestamped session store as of now)</li>
</ul>

View File

@ -106,7 +106,8 @@
<div class="section" id="querying-local-state-stores-for-an-app-instance">
<span id="streams-developer-guide-interactive-queries-local-stores"></span><h2><a class="toc-backref" href="#id3">Querying local state stores for an app instance</a><a class="headerlink" href="#querying-local-state-stores-for-an-app-instance" title="Permalink to this headline"></a></h2>
<p>A Kafka Streams application typically runs on multiple instances. The state that is locally available on any given instance is only a subset of the <a class="reference internal" href="../architecture.html#streams-architecture-state"><span class="std std-ref">application&#8217;s entire state</span></a>. Querying the local stores on an instance will only return data locally available on that particular instance.</p>
<p>The method <code class="docutils literal"><span class="pre">KafkaStreams#store(...)</span></code> finds an application instance&#8217;s local state stores by name and type.</p>
<p>The method <code class="docutils literal"><span class="pre">KafkaStreams#store(...)</span></code> finds an application instance&#8217;s local state stores by name and type.
Note that interactive queries are not supported for <a class="reference internal" href="processor-api.html#streams-developer-guide-state-store-versioned"><span class="std std-ref">versioned state stores</span></a> at this time.</p>
<div class="figure align-center" id="id1">
<img class="centered" src="/{{version}}/images/streams-interactive-queries-api-01.png">
<p class="caption"><span class="caption-text">Every application instance can directly query any of its local state stores.</span></p>

View File

@ -150,9 +150,10 @@ props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);</code></pre>
Serdes.String(),
Serdes.Long())
.withCachingEnabled();</code></pre>
<p>Record caches are not supported for <a class="reference internal" href="processor-api.html#streams-developer-guide-state-store-versioned"><span class="std std-ref">versioned state stores</span></a>.</p>
</div>
<div class="section" id="rocksdb">
<h2><a class="toc-backref" href="#id3">RocksDB</a><a class="headerlink" href="#rocksdb" title="Permalink to this headline"></a></h2>
<span id="streams-developer-guide-memory-management-rocksdb"></span><h2><a class="toc-backref" href="#id3">RocksDB</a><a class="headerlink" href="#rocksdb" title="Permalink to this headline"></a></h2>
<p> Each instance of RocksDB allocates off-heap memory for a block cache, index and filter blocks, and memtable (write buffer). Critical configs (for RocksDB version 4.1.0) include
<code class="docutils literal"><span class="pre">block_cache_size</span></code>, <code class="docutils literal"><span class="pre">write_buffer_size</span></code> and <code class="docutils literal"><span class="pre">max_write_buffer_number</span></code>. These can be specified through the
<code class="docutils literal"><span class="pre">rocksdb.config.setter</span></code> configuration.</p>

View File

@ -50,6 +50,7 @@
<li><a class="reference internal" href="#fault-tolerant-state-stores" id="id5">Fault-tolerant State Stores</a></li>
<li><a class="reference internal" href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" id="id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a></li>
<li><a class="reference internal" href="#timestamped-state-stores" id="id11">Timestamped State Stores</a></li>
<li><a class="reference internal" href="#versioned-state-stores" id="id12">Versioned Key-Value State Stores</a></li>
<li><a class="reference internal" href="#implementing-custom-state-stores" id="id7">Implementing Custom State Stores</a></li>
</ul>
</li>
@ -261,12 +262,17 @@
space.</li>
<li>RocksDB settings can be fine-tuned, see
<a class="reference internal" href="config-streams.html#streams-developer-guide-rocksdb-config"><span class="std std-ref">RocksDB configuration</span></a>.</li>
<li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore-java.lang.String-">store variants</a>:
time window key-value store, session window key-value store.</li>
<li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore-java.lang.String-">persistentTimestampedKeyValueStore</a>
when you need a persistent key-(value/timestamp) store that supports put/get/delete and range queries.</li>
<li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore-java.lang.String-java.time.Duration-java.time.Duration-boolean-">persistentTimestampedWindowStore</a>
when you need a persistent windowedKey-(value/timestamp) store.</li>
<li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore(java.lang.String)">store variants</a>:
timestamped key-value store, versioned key-value store, time window key-value store, session window key-value store.</li>
<li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore(java.lang.String)">persistentTimestampedKeyValueStore</a>
when you need a persistent key-(value/timestamp) store that supports put/get/delete and range queries.</li>
<li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore(java.lang.String,java.time.Duration)">persistentVersionedKeyValueStore</a>
when you need a persistent, versioned key-(value/timestamp) store that supports put/get/delete and timestamped get operations.</li>
<li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore(java.lang.String,java.time.Duration,java.time.Duration,boolean)">persistentWindowStore</a>
or <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore(java.lang.String,java.time.Duration,java.time.Duration,boolean)">persistentTimestampedWindowStore</a>
when you need a persistent timeWindowedKey-value or timeWindowedKey-(value/timestamp) store, respectively.</li>
<li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStore(java.lang.String,java.time.Duration)">persistentSessionStore</a>
when you need a persistent sessionWindowedKey-value store.</li>
</ul>
<pre class="line-numbers"><code class="language-java">// Creating a persistent key-value store:
// here, we create a `KeyValueStore&lt;String, Long&gt;` named &quot;persistent-counts&quot;.
@ -300,6 +306,7 @@ KeyValueStore&lt;String, Long&gt; countStore = countStoreSupplier.build();</code
when you need a key-(value/timestamp) store that supports put/get/delete and range queries.</li>
<li>Use <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html">TimestampedWindowStore</a>
when you need to store windowedKey-(value/timestamp) pairs.</li>
<li>There is no built-in in-memory, versioned key-value store at this time.</li>
</ul>
<pre class="line-numbers"><code class="language-java">// Creating an in-memory key-value store:
// here, we create a `KeyValueStore&lt;String, Long&gt;` named &quot;inmemory-counts&quot;.
@ -396,12 +403,76 @@ StoreBuilder&lt;KeyValueStore&lt;String, Long&gt;&gt; countStoreSupplier = Store
</ul>
</p>
</div>
<div class="section" id="versioned-state-stores">
<span id="streams-developer-guide-state-store-versioned"></span><h3><a class="toc-backref" href="#id12">Versioned Key-Value State Stores</a><a class="headerlink" href="#versioned-state-stores" title="Permalink to this headline"></a></h3>
<p>Versioned key-value state stores are available since Kafka Streams 3.5.
Rather than storing a single record version (value and timestamp) per key,
versioned state stores may store multiple record versions per key. This
allows versioned state stores to support timestamped retrieval operations
to return the latest record (per key) as of a specified timestamp.</p>
<p>You can create a persistent, versioned state store by passing a
<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore(java.lang.String,java.time.Duration)">VersionedBytesStoreSupplier</a>
to the
<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#versionedKeyValueStoreBuilder(java.lang.String,java.time.Duration)">versionedKeyValueStoreBuilder</a>,
or by implementing your own
<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/VersionedKeyValueStore.html">VersionedKeyValueStore</a>.</p>
<p>Each versioned store has an associated, fixed-duration <em>history retention</em>
parameter which specifies long old record versions should be kept for.
In particular, a versioned store guarantees to return accurate results for
timestamped retrieval operations where the timestamp being queried is within
history retention of the current observed stream time.</p>
<p>History retention also doubles as its <em>grace period</em>, which determines
how far back in time out-of-order writes to the store will be accepted. A
versioned store will not accept writes (inserts, updates, or deletions) if
the timestamp associated with the write is older than the current observed
stream time by more than the grace period. Stream time in this context is
tracked per-partition, rather than per-key, which means it's important
that grace period (i.e., history retention) be set high enough to
accommodate a record with one key arriving out-of-order relative to a
record for another key.</p>
<p>Because the memory footprint of versioned key-value stores is higher than
that of non-versioned key-value stores, you may want to adjust your
<a class="reference internal" href="memory-mgmt.html#streams-developer-guide-memory-management-rocksdb"><span class="std std-ref">RocksDB memory settings</span></a>
accordingly. Benchmarking your application with versioned stores is also
advised as performance is expected to be worse than when using non-versioned
stores.</p>
<p>Versioned stores do not support caching or interactive queries at this time.
Also, window stores may not be versioned.</p>
<b>Upgrade note:</b> Versioned state stores are opt-in only; no automatic
upgrades from non-versioned to versioned stores will take place.
<p>Upgrades are supported from persistent, non-versioned key-value stores
to persistent, versioned key-value stores as long as the original store
has the same changelog topic format as the versioned store being upgraded
to. Both persistent
<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore(java.lang.String)">key-value stores</a>
and <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore(java.lang.String)">timestamped key-value stores</a>
share the same changelog topic format as
<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore(java.lang.String,java.time.Duration)">persistent versioned key-value stores</a>,
and therefore both are eligible for upgrades.</p>
<p>If you wish to upgrade an application using persistent, non-versioned
key-value stores to use persistent, versioned key-value stores
instead, you can perform the following procedure:</p>
<ul class="first simple">
<li>Stop all application instances, and
<a class="reference internal" href="app-reset-tool.html#streams-developer-guide-reset-local-environment"><span class="std std-ref">clear any local state directories</span></a>
for the store(s) being upgraded.</li>
<li>Update your application code to use versioned stores where desired.</li>
<li>Update your changelog topic configs, for the relevant state stores,
to set the value of <code class="docutils literal"><span class="pre">min.compaction.lag.ms</span></code>
to be at least your desired history retention. History retention plus
one day is recommended as buffer for the use of broker wall clock time
during compaction.</li>
<li>Restart your application instances and allow time for the versioned
stores to rebuild state from changelog.</li>
</ul>
</p>
</div>
<div class="section" id="implementing-custom-state-stores">
<span id="streams-developer-guide-state-store-custom"></span><h3><a class="toc-backref" href="#id7">Implementing Custom State Stores</a><a class="headerlink" href="#implementing-custom-state-stores" title="Permalink to this headline"></a></h3>
<p>You can use the <a class="reference internal" href="#streams-developer-guide-state-store-defining"><span class="std std-ref">built-in state store types</span></a> or implement your own.
The primary interface to implement for the store is
<code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateStore</span></code>. Kafka Streams also has a few extended interfaces such
as <code class="docutils literal"><span class="pre">KeyValueStore</span></code>.</p>
as <code class="docutils literal"><span class="pre">KeyValueStore</span></code> and <code class="docutils literal"><span class="pre">VersionedKeyValueStore</span></code>.</p>
<p>Note that your customized <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateStore</span></code> implementation also needs to provide the logic on how to restore the state
via the <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.StateRestoreCallback</span></code> or <code class="docutils literal"><span class="pre">org.apache.kafka.streams.processor.BatchingStateRestoreCallback</span></code> interface.
Details on how to instantiate these interfaces can be found in the <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/StateStore.html">javadocs</a>.</p>

View File

@ -131,6 +131,21 @@
More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION</code> can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
</p>
<h3><a id="streams_api_changes_350" href="#streams_api_changes_350">Streams API changes in 3.5.0</a></h3>
<p>
A new state store type, versioned key-value stores, was introduced in
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores">KIP-889</a>.
Rather than storing a single record version (value and timestamp) per key,
versioned state stores may store multiple record versions per key. This
allows versioned state stores to support timestamped retrieval operations
to return the latest record (per key) as of a specified timestamp.
For more information, including how to upgrade from a non-versioned key-value
store to a versioned store in an existing application, see the
<a href="/{{version}}/documentation/streams/developer-guide/processor-api.html#versioned-state-stores">Developer Guide</a> section.
Versioned key-value stores are opt-in only; existing applications will not be
affected upon upgrading to 3.5 without explicit code changes.
</p>
<h3><a id="streams_api_changes_310" href="#streams_api_changes_310">Streams API changes in 3.1.0</a></h3>
<p>
The semantics of left/outer stream-stream join got improved via

View File

@ -19,6 +19,16 @@
<script id="upgrade-template" type="text/x-handlebars-template">
<h5><a id="upgrade_350_notable" href="#upgrade_350_notable">Notable changes in 3.5.0</a></h5>
<ul>
<li>Kafka Streams has introduced a new state store type, versioned key-value stores,
for storing multiple record versions per key, thereby enabling timestamped retrieval
operations to return the latest record (per key) as of a specified timestamp.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores">KIP-889</a>
for more details.
</li>
</ul>
<h4><a id="upgrade_3_4_0" href="#upgrade_3_4_0">Upgrading to 3.4.0 from any version 0.8.x through 3.3.x</a></h4>
<p><b>If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.

View File

@ -121,6 +121,11 @@ public final class Stores {
* <p>
* This store supplier can be passed into a
* {@link #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde)}.
* <p>
* Note that it is not safe to change the value of {@code historyRetention} between
* application restarts without clearing local state from application instances,
* as this may cause incorrect values to be read from the state store if it impacts
* the underlying storage format.
*
* @param name name of the store (cannot be {@code null})
* @param historyRetention length of time that old record versions are available for query
@ -148,6 +153,10 @@ public final class Stores {
* <p>
* This store supplier can be passed into a
* {@link #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde)}.
* <p>
* Note that it is not safe to change the value of {@code segmentInterval} between
* application restarts without clearing local state from application instances,
* as this may cause incorrect values to be read from the state store otherwise.
*
* @param name name of the store (cannot be {@code null})
* @param historyRetention length of time that old record versions are available for query
@ -239,6 +248,11 @@ public final class Stores {
* This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
* If you want to create a {@link TimestampedWindowStore} you should use
* {@link #persistentTimestampedWindowStore(String, Duration, Duration, boolean)} to create a store supplier instead.
* <p>
* Note that it is not safe to change the value of {@code retentionPeriod} between
* application restarts without clearing local state from application instances,
* as this may cause incorrect values to be read from the state store if it impacts
* the underlying storage format.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
@ -266,6 +280,11 @@ public final class Stores {
* {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
* If you want to create a {@link WindowStore} you should use
* {@link #persistentWindowStore(String, Duration, Duration, boolean)} to create a store supplier instead.
* <p>
* Note that it is not safe to change the value of {@code retentionPeriod} between
* application restarts without clearing local state from application instances,
* as this may cause incorrect values to be read from the state store if it impacts
* the underlying storage format.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
@ -380,6 +399,11 @@ public final class Stores {
/**
* Create a persistent {@link SessionBytesStoreSupplier}.
* <p>
* Note that it is not safe to change the value of {@code retentionPeriod} between
* application restarts without clearing local state from application instances,
* as this may cause incorrect values to be read from the state store if it impacts
* the underlying storage format.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)