From 7c74f3983b1cdbca59bb17e9070c8b4386c563f8 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Wed, 12 Apr 2023 14:31:27 -0400 Subject: [PATCH] KAFKA-14491: [21/N] Docs updates for versioned state stores (#13444) Add docs for KIP-889. Reviewers: Matthias J. Sax --- docs/streams/developer-guide/dsl-api.html | 4 +- .../developer-guide/interactive-queries.html | 3 +- docs/streams/developer-guide/memory-mgmt.html | 3 +- .../developer-guide/processor-api.html | 85 +++++++++++++++++-- docs/streams/upgrade-guide.html | 15 ++++ docs/upgrade.html | 10 +++ .../apache/kafka/streams/state/Stores.java | 24 ++++++ 7 files changed, 134 insertions(+), 10 deletions(-) diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html index 3111f5137aa..adf0f33bb4b 100644 --- a/docs/streams/developer-guide/dsl-api.html +++ b/docs/streams/developer-guide/dsl-api.html @@ -852,7 +852,9 @@ KStream<byte[], String> repartitionedStream = stream.repartition(Repartiti defined window boundary.

Note: Following store types are used regardless of the possibly specified type (via the parameter materialized):

diff --git a/docs/streams/developer-guide/interactive-queries.html b/docs/streams/developer-guide/interactive-queries.html index 45bfdb916a7..9160dd8ddf4 100644 --- a/docs/streams/developer-guide/interactive-queries.html +++ b/docs/streams/developer-guide/interactive-queries.html @@ -106,7 +106,8 @@

Querying local state stores for an app instance

A Kafka Streams application typically runs on multiple instances. The state that is locally available on any given instance is only a subset of the application’s entire state. Querying the local stores on an instance will only return data locally available on that particular instance.

-

The method KafkaStreams#store(...) finds an application instance’s local state stores by name and type.

+

The method KafkaStreams#store(...) finds an application instance’s local state stores by name and type. + Note that interactive queries are not supported for versioned state stores at this time.

Every application instance can directly query any of its local state stores.

diff --git a/docs/streams/developer-guide/memory-mgmt.html b/docs/streams/developer-guide/memory-mgmt.html index 3ceaacd503d..7f5bc09cef5 100644 --- a/docs/streams/developer-guide/memory-mgmt.html +++ b/docs/streams/developer-guide/memory-mgmt.html @@ -150,9 +150,10 @@ props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); Serdes.String(), Serdes.Long()) .withCachingEnabled(); +

Record caches are not supported for versioned state stores.

-

RocksDB

+

RocksDB

Each instance of RocksDB allocates off-heap memory for a block cache, index and filter blocks, and memtable (write buffer). Critical configs (for RocksDB version 4.1.0) include block_cache_size, write_buffer_size and max_write_buffer_number. These can be specified through the rocksdb.config.setter configuration.

diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html index e5bab51be9d..586e55f6d0b 100644 --- a/docs/streams/developer-guide/processor-api.html +++ b/docs/streams/developer-guide/processor-api.html @@ -50,6 +50,7 @@
  • Fault-tolerant State Stores
  • Enable or Disable Fault Tolerance of State Stores (Store Changelogs)
  • Timestamped State Stores
  • +
  • Versioned Key-Value State Stores
  • Implementing Custom State Stores
  • @@ -261,12 +262,17 @@ space.
  • RocksDB settings can be fine-tuned, see RocksDB configuration.
  • -
  • Available store variants: - time window key-value store, session window key-value store.
  • -
  • Use persistentTimestampedKeyValueStore - when you need a persistent key-(value/timestamp) store that supports put/get/delete and range queries.
  • -
  • Use persistentTimestampedWindowStore - when you need a persistent windowedKey-(value/timestamp) store.
  • +
  • Available store variants: + timestamped key-value store, versioned key-value store, time window key-value store, session window key-value store.
  • +
  • Use persistentTimestampedKeyValueStore + when you need a persistent key-(value/timestamp) store that supports put/get/delete and range queries.
  • +
  • Use persistentVersionedKeyValueStore + when you need a persistent, versioned key-(value/timestamp) store that supports put/get/delete and timestamped get operations.
  • +
  • Use persistentWindowStore + or persistentTimestampedWindowStore + when you need a persistent timeWindowedKey-value or timeWindowedKey-(value/timestamp) store, respectively.
  • +
  • Use persistentSessionStore + when you need a persistent sessionWindowedKey-value store.
  • // Creating a persistent key-value store:
     // here, we create a `KeyValueStore<String, Long>` named "persistent-counts".
    @@ -300,6 +306,7 @@ KeyValueStore<String, Long> countStore = countStoreSupplier.build();
                                 
  • Use TimestampedWindowStore when you need to store windowedKey-(value/timestamp) pairs.
  • +
  • There is no built-in in-memory, versioned key-value store at this time.
  • // Creating an in-memory key-value store:
     // here, we create a `KeyValueStore<String, Long>` named "inmemory-counts".
    @@ -396,12 +403,76 @@ StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Store
                     
                     

    +
    +

    Versioned Key-Value State Stores

    +

    Versioned key-value state stores are available since Kafka Streams 3.5. + Rather than storing a single record version (value and timestamp) per key, + versioned state stores may store multiple record versions per key. This + allows versioned state stores to support timestamped retrieval operations + to return the latest record (per key) as of a specified timestamp.

    +

    You can create a persistent, versioned state store by passing a + VersionedBytesStoreSupplier + to the + versionedKeyValueStoreBuilder, + or by implementing your own + VersionedKeyValueStore.

    +

    Each versioned store has an associated, fixed-duration history retention + parameter which specifies long old record versions should be kept for. + In particular, a versioned store guarantees to return accurate results for + timestamped retrieval operations where the timestamp being queried is within + history retention of the current observed stream time.

    +

    History retention also doubles as its grace period, which determines + how far back in time out-of-order writes to the store will be accepted. A + versioned store will not accept writes (inserts, updates, or deletions) if + the timestamp associated with the write is older than the current observed + stream time by more than the grace period. Stream time in this context is + tracked per-partition, rather than per-key, which means it's important + that grace period (i.e., history retention) be set high enough to + accommodate a record with one key arriving out-of-order relative to a + record for another key.

    +

    Because the memory footprint of versioned key-value stores is higher than + that of non-versioned key-value stores, you may want to adjust your + RocksDB memory settings + accordingly. Benchmarking your application with versioned stores is also + advised as performance is expected to be worse than when using non-versioned + stores.

    +

    Versioned stores do not support caching or interactive queries at this time. + Also, window stores may not be versioned.

    + Upgrade note: Versioned state stores are opt-in only; no automatic + upgrades from non-versioned to versioned stores will take place. +

    Upgrades are supported from persistent, non-versioned key-value stores + to persistent, versioned key-value stores as long as the original store + has the same changelog topic format as the versioned store being upgraded + to. Both persistent + key-value stores + and timestamped key-value stores + share the same changelog topic format as + persistent versioned key-value stores, + and therefore both are eligible for upgrades.

    +

    If you wish to upgrade an application using persistent, non-versioned + key-value stores to use persistent, versioned key-value stores + instead, you can perform the following procedure:

    +
      +
    • Stop all application instances, and + clear any local state directories + for the store(s) being upgraded.
    • +
    • Update your application code to use versioned stores where desired.
    • +
    • Update your changelog topic configs, for the relevant state stores, + to set the value of min.compaction.lag.ms + to be at least your desired history retention. History retention plus + one day is recommended as buffer for the use of broker wall clock time + during compaction.
    • +
    • Restart your application instances and allow time for the versioned + stores to rebuild state from changelog.
    • +
    +

    +

    Implementing Custom State Stores

    You can use the built-in state store types or implement your own. The primary interface to implement for the store is org.apache.kafka.streams.processor.StateStore. Kafka Streams also has a few extended interfaces such - as KeyValueStore.

    + as KeyValueStore and VersionedKeyValueStore.

    Note that your customized org.apache.kafka.streams.processor.StateStore implementation also needs to provide the logic on how to restore the state via the org.apache.kafka.streams.processor.StateRestoreCallback or org.apache.kafka.streams.processor.BatchingStateRestoreCallback interface. Details on how to instantiate these interfaces can be found in the javadocs.

    diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 1409b166564..b649ff68c60 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -131,6 +131,21 @@ More details about the new config StreamsConfig#TOPOLOGY_OPTIMIZATION can be found in KIP-295.

    +

    Streams API changes in 3.5.0

    +

    + A new state store type, versioned key-value stores, was introduced in + KIP-889. + Rather than storing a single record version (value and timestamp) per key, + versioned state stores may store multiple record versions per key. This + allows versioned state stores to support timestamped retrieval operations + to return the latest record (per key) as of a specified timestamp. + For more information, including how to upgrade from a non-versioned key-value + store to a versioned store in an existing application, see the + Developer Guide section. + Versioned key-value stores are opt-in only; existing applications will not be + affected upon upgrading to 3.5 without explicit code changes. +

    +

    Streams API changes in 3.1.0

    The semantics of left/outer stream-stream join got improved via diff --git a/docs/upgrade.html b/docs/upgrade.html index 65ddb4c000f..14c6a218b64 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -19,6 +19,16 @@