From e847f057e31cbd50db854f12f2eafe0d1a865068 Mon Sep 17 00:00:00 2001
From: Bill Bejeck 
 -
 
-    
         The messaging layer of Kafka partitions data for storing and transporting it. Kafka Streams partitions data for processing it.
@@ -91,7 +91,7 @@
      
     
 
-    
Kafka Streams allows the user to configure the number of threads that the library can use to parallelize processing within an application instance. @@ -112,7 +112,7 @@
         Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data,
@@ -131,7 +131,7 @@
      
     
 
-    
Kafka Streams builds on fault-tolerance capabilities integrated natively within Kafka. Kafka partitions are highly available and replicated; so when stream data is persisted to Kafka it is available @@ -165,10 +165,10 @@ -
Note
-If you enable n standby tasks, you need to provision n+1 KafkaStreams
-              instances.
Note
+If you enable n standby tasks, you need to provision n+1 KafkaStreams
+            instances.
@@ -664,22 +661,22 @@processing.guarantee
The processing guarantee that should be used. - Possible values are"at_least_once"(default), -"exactly_once", - and"exactly_once_beta". - Using"exactly_once"requires broker - version 0.11.0 or newer, while using"exactly_once_beta"- requires broker version 2.5 or newer. - Note that if exactly-once processing is enabled, the default for parameter -commit.interval.mschanges to 100ms. - Additionally, consumers are configured withisolation.level="read_committed"- and producers are configured withenable.idempotence=trueper default. - Note that by default exactly-once processing requires a cluster of at least three brokers what is the recommended setting for production. - For development, you can change this configuration by adjusting broker setting -transaction.state.log.replication.factor- andtransaction.state.log.min.isr- to the number of brokers you want to use. - For more details see Processing Guarantees. + Possible values are"at_least_once"(default), +"exactly_once", + and"exactly_once_beta". + Using"exactly_once"requires broker + version 0.11.0 or newer, while using"exactly_once_beta"+ requires broker version 2.5 or newer. + Note that if exactly-once processing is enabled, the default for parameter +commit.interval.mschanges to 100ms. + Additionally, consumers are configured withisolation.level="read_committed"+ and producers are configured withenable.idempotence=trueper default. + Note that by default exactly-once processing requires a cluster of at least three brokers what is the recommended setting for production. + For development, you can change this configuration by adjusting broker setting +transaction.state.log.replication.factor+ andtransaction.state.log.min.isr+ to the number of brokers you want to use. + For more details see Processing Guarantees.
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig(); Get a reference to the existing table config rather than create a new one, so you don't accidentally overwrite defaults such as the BloomFilter, which is an important optimization.
-                        tableConfig.setBlockSize(16 * 1024L); Modify the default block size per these instructions from the RocksDB GitHub.tableConfig.setCacheIndexAndFilterBlocks(true); Do not let the index and filter blocks grow unbounded. For more information, see the RocksDB GitHub.options.setMaxWriteBufferNumber(2); See the advanced options in the RocksDB GitHub.cache.close(); To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See  RocksJava docs for more details.--The state directory. Kafka Streams persists local states under the state directory. Each application has a subdirectory on its hosting - machine that is located under the state directory. The name of the subdirectory is the application ID. The state stores associated - with the application are created under this subdirectory. When running multiple instances of the same application on a single machine, - this path must be unique for each such instance.-
---- You can tell Streams to apply topology optimizations by setting this config. The optimizations are currently all or none and disabled by default. - These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. It is recommended to enable this. -
-- Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to
-StreamsConfig.OPTIMIZE, you'll need to pass in your - configuration properties when building your topology by using the overloadedStreamsBuilder.build(Properties)method. - For exampleKafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties). -
-+- The version you are upgrading from. It is important to set this config when performing a rolling upgrade to certain versions, as described in the upgrade guide. - You should set this config to the appropriate version before bouncing your instances and upgrading them to the newer version. Once everyone is on the - newer version, you should remove this config and do a second rolling bounce. It is only necessary to set this config and follow the two-bounce upgrade path - when upgrading from below version 2.0, or when upgrading to 2.4+ from any version lower than 2.4. --
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig(); Get a reference to the existing table config rather than create a new one, so you don't accidentally overwrite defaults such as the BloomFilter, which is an important optimization.
+                  tableConfig.setBlockSize(16 * 1024L); Modify the default block size per these instructions from the RocksDB GitHub.tableConfig.setCacheIndexAndFilterBlocks(true); Do not let the index and filter blocks grow unbounded. For more information, see the RocksDB GitHub.options.setMaxWriteBufferNumber(2); See the advanced options in the RocksDB GitHub.cache.close(); To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See  RocksJava docs for more details.You can specify parameters for the Kafka consumers, producers,
-            and admin client that are used internally.
-            The consumer, producer and admin client settings are defined by specifying parameters in a StreamsConfig instance.
In this example, the Kafka consumer session timeout is configured to be 60000 milliseconds in the Streams settings:
-Properties streamsSettings = new Properties();
+      
+    ++The state directory. Kafka Streams persists local states under the state directory. Each application has a subdirectory on its hosting + machine that is located under the state directory. The name of the subdirectory is the application ID. The state stores associated + with the application are created under this subdirectory. When running multiple instances of the same application on a single machine, + this path must be unique for each such instance.+
++++ You can tell Streams to apply topology optimizations by setting this config. The optimizations are currently all or none and disabled by default. + These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. It is recommended to enable this. +
++ Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to
+StreamsConfig.OPTIMIZE, you'll need to pass in your + configuration properties when building your topology by using the overloadedStreamsBuilder.build(Properties)method. + For exampleKafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties). +
+++ The version you are upgrading from. It is important to set this config when performing a rolling upgrade to certain versions, as described in the upgrade guide. + You should set this config to the appropriate version before bouncing your instances and upgrading them to the newer version. Once everyone is on the + newer version, you should remove this config and do a second rolling bounce. It is only necessary to set this config and follow the two-bounce upgrade path + when upgrading from below version 2.0, or when upgrading to 2.4+ from any version lower than 2.4. ++
You can specify parameters for the Kafka consumers, producers,
+      and admin client that are used internally.
+      The consumer, producer and admin client settings are defined by specifying parameters in a StreamsConfig instance.
In this example, the Kafka consumer session timeout is configured to be 60000 milliseconds in the Streams settings:
+Properties streamsSettings = new Properties();
 // Example of a "normal" setting for Kafka Streams
 streamsSettings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
 // Customize the Kafka consumer settings of your Streams application
 streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
 Some consumer, producer and admin client configuration parameters use the same parameter name, and Kafka Streams library itself also uses some parameters that share the same name with its embedded client. For example, send.buffer.bytes and
-              receive.buffer.bytes are used to configure TCP buffers; request.timeout.ms and retry.backoff.ms control retries for client request;
-              retries are used to configure how many retries are allowed when handling retriable errors from broker request responses.
-              You can avoid duplicate names by prefix parameter names with consumer., producer., or admin. (e.g., consumer.send.buffer.bytes and producer.send.buffer.bytes).
Properties streamsSettings = new Properties();
+    Some consumer, producer and admin client configuration parameters use the same parameter name, and Kafka Streams library itself also uses some parameters that share the same name with its embedded client. For example, send.buffer.bytes and
+        receive.buffer.bytes are used to configure TCP buffers; request.timeout.ms and retry.backoff.ms control retries for client request;
+        retries are used to configure how many retries are allowed when handling retriable errors from broker request responses.
+        You can avoid duplicate names by prefix parameter names with consumer., producer., or admin. (e.g., consumer.send.buffer.bytes and producer.send.buffer.bytes).
Properties streamsSettings = new Properties();
 // same value for consumer, producer, and admin client
 streamsSettings.put("PARAMETER_NAME", "value");
 // different values for consumer and producer
@@ -813,14 +810,14 @@
 streamsSettings.put(StreamsConfig.producerPrefix("PARAMETER_NAME"), "producer-value");
 streamsSettings.put(StreamsConfig.adminClientPrefix("PARAMETER_NAME"), "admin-value");
 You could further separate consumer configuration by adding different prefixes:
-main.consumer. for main consumer which is the default consumer of stream source.restore.consumer. for restore consumer which is in charge of state store recovery.global.consumer. for global consumer which is used in global KTable construction.For example, if you only want to set restore consumer config without touching other consumers' settings, you could simply use restore.consumer. to set the config.
Properties streamsSettings = new Properties();
+        You could further separate consumer configuration by adding different prefixes:
+        main.consumer. for main consumer which is the default consumer of stream source.restore.consumer. for restore consumer which is in charge of state store recovery.global.consumer. for global consumer which is used in global KTable construction.For example, if you only want to set restore consumer config without touching other consumers' settings, you could simply use restore.consumer. to set the config.
Properties streamsSettings = new Properties();
 // same config value for all consumer types
 streamsSettings.put("consumer.PARAMETER_NAME", "general-consumer-value");
 // set a different restore consumer config. This would make restore consumer take restore-consumer-value,
@@ -829,103 +826,103 @@
 // alternatively, you can use
 streamsSettings.put(StreamsConfig.restoreConsumerPrefix("PARAMETER_NAME"), "restore-consumer-value");
  Same applied to main.consumer. and main.consumer., if you only want to specify one consumer type config.
 Additionally, to configure the internal repartition/changelog topics, you could use the topic. prefix, followed by any of the standard topic configs.
Properties streamsSettings = new Properties();
+         Same applied to main.consumer. and main.consumer., if you only want to specify one consumer type config.
 Additionally, to configure the internal repartition/changelog topics, you could use the topic. prefix, followed by any of the standard topic configs.
Properties streamsSettings = new Properties();
 // Override default for both changelog and repartition topics
 streamsSettings.put("topic.PARAMETER_NAME", "topic-value");
 // alternatively, you can use
 streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");
 Kafka Streams uses different default values for some of the underlying client configs, which are summarized below. For detailed descriptions - of these configs, see Producer Configs - and Consumer Configs.
-| Parameter Name- | Corresponding Client- | Streams Default- | 
|---|---|---|
| auto.offset.reset- | Consumer- | earliest- | 
| linger.ms- | Producer- | 100- | 
| max.poll.interval.ms- | Consumer- | Integer.MAX_VALUE- | 
| max.poll.records- | Consumer- | 1000- | 
Kafka Streams assigns the following configuration parameters. If you try to change
-            allow.auto.create.topics, your value
-            is ignored and setting it has no effect in a Kafka Streams application. You can set the other parameters.
-            Kafka Streams sets them to different default values than a plain
-            KafkaConsumer.
-          
Kafka Streams uses the client.id
-            parameter to compute derived client IDs for internal clients. If you don't set
-            client.id, Kafka Streams sets it to
-            <application.id>-<random-UUID>.
-            
| Parameter Name- | Corresponding Client- | Streams Default- | 
|---|---|---|
| allow.auto.create.topics- | Consumer- | false- | 
| auto.offset.reset- | Consumer- | earliest- | 
| linger.ms- | Producer- | 100- | 
| max.poll.interval.ms- | Consumer- | 300000- | 
| max.poll.records- | Consumer- | 1000- | 
-The consumer auto commit. To guarantee at-least-once processing semantics and turn off auto commits, Kafka Streams overrides this consumer config - value tofalse. Consumers will only commit explicitly via commitSync calls when the Kafka Streams library or a user decides - to commit the current processing state.
Kafka Streams uses different default values for some of the underlying client configs, which are summarized below. For detailed descriptions + of these configs, see Producer Configs + and Consumer Configs.
+| Parameter Name+ | Corresponding Client+ | Streams Default+ | 
|---|---|---|
| auto.offset.reset+ | Consumer+ | earliest+ | 
| linger.ms+ | Producer+ | 100+ | 
| max.poll.interval.ms+ | Consumer+ | Integer.MAX_VALUE+ | 
| max.poll.records+ | Consumer+ | 1000+ | 
Kafka Streams assigns the following configuration parameters. If you try to change
+        allow.auto.create.topics, your value
+        is ignored and setting it has no effect in a Kafka Streams application. You can set the other parameters.
+        Kafka Streams sets them to different default values than a plain
+        KafkaConsumer.
+      
Kafka Streams uses the client.id
+        parameter to compute derived client IDs for internal clients. If you don't set
+        client.id, Kafka Streams sets it to
+        <application.id>-<random-UUID>.
+      
| Parameter Name+ | Corresponding Client+ | Streams Default+ | 
|---|---|---|
| allow.auto.create.topics+ | Consumer+ | false+ | 
| auto.offset.reset+ | Consumer+ | earliest+ | 
| linger.ms+ | Producer+ | 100+ | 
| max.poll.interval.ms+ | Consumer+ | 300000+ | 
| max.poll.records+ | Consumer+ | 1000+ | 
++The consumer auto commit. To guarantee at-least-once processing semantics and turn off auto commits, Kafka Streams overrides this consumer config + value tofalse. Consumers will only commit explicitly via commitSync calls when the Kafka Streams library or a user decides + to commit the current processing state.
There are several Kafka and Kafka Streams configuration options that need to be configured explicitly for resiliency in face of broker failures:
@@ -978,13 +975,12 @@Properties streamsSettings = new Properties();
 streamsSettings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
 streamsSettings.put(StreamsConfig.topicPrefix(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), 2);
-streamsSettings.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
-If you want to override serdes selectively, i.e., keep the defaults for some fields, then don’t specify the serde whenever you want to leverage the default settings:
import org.apache.kafka.common.serialization.Serde;
@@ -89,8 +87,7 @@
 // but override the default serializer for record values (here: userCount as Long).
 final Serde<Long> longSerde = Serdes.Long();
 KStream<String, Long> userCountByRegion = ...;
-userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.Long()));
-If some of your incoming records are corrupted or ill-formatted, they will cause the deserializer class to report an error.
          Since 1.0.x we have introduced an DeserializationExceptionHandler interface which allows
@@ -104,12 +101,11 @@
         
Apache Kafka includes several built-in serde implementations for Java primitives and basic types such as byte[] in
           its kafka-clients Maven artifact:
<dependency>
+        <dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-clients</artifactId>
     <version>{{fullDotVersion}}</version>
-</dependency>
-
+</dependency>
         This artifact provides the following serde implementations under the package org.apache.kafka.common.serialization, which you can leverage when e.g., defining default serializers in your Streams configuration.
| @@ -467,8 +462,7 @@
     }
   );
 
-// Java 7 example: cf. `map` for how to create `KeyValueMapper` instances
-
+// Java 7 example: cf. `map` for how to create `KeyValueMapper` instances
                             
                         
                     
@@ -486,8 +480,7 @@
 KStream<byte[], String> sentences = ...;
 KStream<byte[], String> words = sentences.flatMapValues(value -> Arrays.asList(value.split("\\s+")));
 
-// Java 7 example: cf. `mapValues` for how to create `ValueMapper` instances
-
+// Java 7 example: cf. `mapValues` for how to create `ValueMapper` instances
                             
                         
                     
@@ -504,7 +497,7 @@
                                 further processing of the input data (unlikeEvaluates a boolean function for each element and drops those for which the function returns true. (KStream details, KTable details)- KStream<String, Long> stream = ...;
+                             |