From 718d6f2475d377fd220da9898d25f7c8191f98cd Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 4 Jun 2018 13:43:20 -0700 Subject: [PATCH] MINOR: Remove deprecated KafkaStreams constructors in docs (#5118) Reviewers: Bill Bejeck , Matthias J. Sax --- .../developer-guide/config-streams.html | 20 ++--------- docs/streams/developer-guide/datatypes.html | 6 ++-- docs/streams/developer-guide/dsl-api.html | 10 +++--- .../developer-guide/interactive-queries.html | 15 ++++---- docs/streams/developer-guide/memory-mgmt.html | 14 ++++---- docs/streams/developer-guide/security.html | 3 +- docs/streams/developer-guide/testing.html | 34 +++++++++---------- .../developer-guide/write-streams.html | 7 ++-- docs/streams/index.html | 28 +++++++-------- docs/streams/tutorial.html | 4 +-- .../apache/kafka/streams/KafkaStreams.java | 3 +- 11 files changed, 61 insertions(+), 83 deletions(-) diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index 753ddc7e2de..2b6ade5c513 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -34,13 +34,11 @@

Configuring a Streams Application

-

Kafka and Kafka Streams configuration options must be configured before using Streams. You can configure Kafka Streams by specifying parameters in a StreamsConfig instance.

+

Kafka and Kafka Streams configuration options must be configured before using Streams. You can configure Kafka Streams by specifying parameters in a java.util.Properties instance.

  1. Create a java.util.Properties instance.

  2. -
  3. Set the parameters.

    -
  4. -
  5. Construct a StreamsConfig instance from the Properties instance. For example:

    +
  6. Set the parameters. For example:

    import java.util.Properties;
     import org.apache.kafka.streams.StreamsConfig;
     
    @@ -50,9 +48,6 @@
     settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
     // Any further settings
     settings.put(... , ...);
    -
    -// Create an instance of StreamsConfig from the Properties instance
    -StreamsConfig config = new StreamsConfig(settings);
     
  7. @@ -520,15 +515,13 @@

Kafka consumers and producer configuration parameters

-

You can specify parameters for the Kafka consumers and producers that are used internally. The consumer and producer settings - are defined by specifying parameters in a StreamsConfig instance.

+

You can specify parameters for the Kafka consumers and producers that are used internally.

In this example, the Kafka consumer session timeout is configured to be 60000 milliseconds in the Streams settings:

Properties streamsSettings = new Properties();
 // Example of a "normal" setting for Kafka Streams
 streamsSettings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
 // Customize the Kafka consumer settings of your Streams application
 streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
-StreamsConfig config = new StreamsConfig(streamsSettings);
 
@@ -706,18 +699,11 @@

replication.factor

-

You define these settings via StreamsConfig:

Properties streamsSettings = new Properties();
 streamsSettings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
 streamsSettings.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
 
-
-

Note

-

A future version of Kafka Streams will allow developers to set their own app-specific configuration settings through - StreamsConfig as well, which can then be accessed through - ProcessorContext.

-
diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html index d8d7b4c985a..11208154fae 100644 --- a/docs/streams/developer-guide/datatypes.html +++ b/docs/streams/developer-guide/datatypes.html @@ -36,7 +36,7 @@

Every Kafka Streams application must provide SerDes (Serializer/Deserializer) for the data types of record keys and record values (e.g. java.lang.String) to materialize the data when necessary. Operations that require such SerDes information include: stream(), table(), to(), through(), groupByKey(), groupBy().

You can provide SerDes by using either of these methods:

    -
  • By setting default SerDes via a StreamsConfig instance.
  • +
  • By setting default SerDes in the java.util.Properties config instance.
  • By specifying explicit SerDes when calling the appropriate API methods, thus overriding the defaults.
@@ -55,7 +55,7 @@

Configuring SerDes

-

SerDes specified in the Streams configuration via StreamsConfig are used as the default in your Kafka Streams application.

+

SerDes specified in the Streams configuration are used as the default in your Kafka Streams application.

import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
 
@@ -64,8 +64,6 @@
 settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
 // Default serde for values of data records (here: built-in serde for Long type)
 settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
-
-StreamsConfig config = new StreamsConfig(settings);
 
diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html index 7c0b7368890..cd3a9656d12 100644 --- a/docs/streams/developer-guide/dsl-api.html +++ b/docs/streams/developer-guide/dsl-api.html @@ -3044,14 +3044,14 @@ t=5 (blue), which lead to a merge of sessions and an extension of a session, res // Write the stream to the output topic, using the configured default key -// and value serdes of your `StreamsConfig`. +// and value serdes. stream.to("my-stream-output-topic"); // Same for table table.to("my-table-output-topic"); // Write the stream to the output topic, using explicit key and value serdes, -// (thus overriding the defaults of your `StreamsConfig`). +// (thus overriding the defaults in the config properties). stream.to("my-stream-output-topic", Produced.with(Serdes.String(), Serdes.Long()); @@ -3131,7 +3131,7 @@ t=5 (blue), which lead to a merge of sessions and an extension of a session, res
-

Testing a Streams application

+

Testing a Streams application

Kafka Streams comes with a test-utils module to help you test your application here.
@@ -3201,7 +3201,7 @@ import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} object WordCountApplication extends App { import Serdes._ - val config: Properties = { + val props: Properties = { val p = new Properties() p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application") p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092") @@ -3216,7 +3216,7 @@ object WordCountApplication extends App { .count(Materialized.as("counts-store")) wordCounts.toStream.to("WordsWithCountsTopic") - val streams: KafkaStreams = new KafkaStreams(builder.build(), config) + val streams: KafkaStreams = new KafkaStreams(builder.build(), props) streams.start() sys.ShutdownHookThread { diff --git a/docs/streams/developer-guide/interactive-queries.html b/docs/streams/developer-guide/interactive-queries.html index 9b64ddbee1c..051f87c87d5 100644 --- a/docs/streams/developer-guide/interactive-queries.html +++ b/docs/streams/developer-guide/interactive-queries.html @@ -129,7 +129,7 @@

Querying local key-value stores

To query a local key-value store, you must first create a topology with a key-value store. This example creates a key-value store named “CountsKeyValueStore”. This store will hold the latest count for any word that is found on the topic “word-count-input”.

-
StreamsConfig config = ...;
+                
Properties  props = ...;
 StreamsBuilder builder = ...;
 KStream<String, String> textLines = ...;
 
@@ -142,7 +142,7 @@
 groupedByWord.count(Materialized.<String, String, KeyValueStore<Bytes, byte[]>as("CountsKeyValueStore"));
 
 // Start an instance of the topology
-KafkaStreams streams = new KafkaStreams(builder, config);
+KafkaStreams streams = new KafkaStreams(builder, props);
 streams.start();
 
@@ -171,7 +171,7 @@

You can also materialize the results of stateless operators by using the overloaded methods that take a queryableStoreName as shown in the example below:

-
StreamsConfig config = ...;
+                

 StreamsBuilder builder = ...;
 KTable<String, Integer> regionCounts = ...;
 
@@ -192,7 +192,7 @@
                     However, there is only one result per window for a given key.

To query a local window store, you must first create a topology with a window store. This example creates a window store named “CountsWindowStore” that contains the counts for words in 1-minute windows.

-
StreamsConfig config = ...;
+                

 StreamsBuilder builder = ...;
 KStream<String, String> textLines = ...;
 
@@ -316,7 +316,7 @@
 

You can now find and query your custom store:

-
StreamsConfig config = ...;
+                

 Topology topology = ...;
 ProcessorSupplier processorSuppler = ...;
 
@@ -368,7 +368,7 @@ interactive queries

Exposing the RPC endpoints of your application

-

To enable remote state store discovery in a distributed Kafka Streams application, you must set the configuration property in StreamsConfig. +

To enable remote state store discovery in a distributed Kafka Streams application, you must set the configuration property in the config properties. The application.server property defines a unique host:port pair that points to the RPC endpoint of the respective instance of a Kafka Streams application. The value of this configuration property will vary across the instances of your application. When this property is set, Kafka Streams will keep track of the RPC endpoint information for every instance of an application, its state stores, and assigned stream partitions through instances of StreamsMetadata.

@@ -386,7 +386,6 @@ interactive queries

props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, rpcEndpoint); // ... further settings may follow here ... -StreamsConfig config = new StreamsConfig(props); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "word-count-input"); @@ -400,7 +399,7 @@ interactive queries

groupedByWord.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("word-count")); // Start an instance of the topology -KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); +KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); // Then, create and start the actual RPC service for remote access to this diff --git a/docs/streams/developer-guide/memory-mgmt.html b/docs/streams/developer-guide/memory-mgmt.html index a73a814be11..7ae20600df5 100644 --- a/docs/streams/developer-guide/memory-mgmt.html +++ b/docs/streams/developer-guide/memory-mgmt.html @@ -80,8 +80,8 @@

The cache size is specified through the cache.max.bytes.buffering parameter, which is a global setting per processing topology:

// Enable record cache of size 10 MB.
-Properties streamsConfiguration = new Properties();
-streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
+Properties props = new Properties();
+props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
 

This parameter controls the number of bytes allocated for caching. Specifically, for a processor topology instance with @@ -105,8 +105,8 @@

  • To turn off caching the cache size can be set to zero:

    // Disable record cache
    -Properties streamsConfiguration = new Properties();
    -streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    +Properties props = new Properties();
    +props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
     

    Turning off caching might result in high write traffic for the underlying RocksDB store. @@ -118,11 +118,11 @@

  • To enable caching but still have an upper bound on how long records will be cached, you can set the commit interval. In this example, it is set to 1000 milliseconds:

    -
    Properties streamsConfiguration = new Properties();
    +            
    Properties props = new Properties();
     // Enable record cache of size 10 MB.
    -streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
    +props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
     // Set commit interval to 1 second.
    -streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
    +props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
     
    diff --git a/docs/streams/developer-guide/security.html b/docs/streams/developer-guide/security.html index 0604747b9d4..9c494566b6f 100644 --- a/docs/streams/developer-guide/security.html +++ b/docs/streams/developer-guide/security.html @@ -95,7 +95,7 @@ ssl.keystore.password=test1234 ssl.key.password=test1234
  • -

    Configure these settings in the application for your StreamsConfig instance. These settings will encrypt any +

    Configure these settings in the application for your Properties instance. These settings will encrypt any data-in-transit that is being read from or written to Kafka, and your application will authenticate itself against the Kafka brokers that it is communicating with. Note that this example does not cover client authorization.

    // Code of your Java application that uses the Kafka Streams library
    @@ -115,7 +115,6 @@ ssl.key.password=test1234
     settings.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/etc/security/tls/kafka.client.keystore.jks");
     settings.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
     settings.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
    -StreamsConfig streamsConfiguration = new StreamsConfig(settings);
     

    If you incorrectly configure a security setting in your application, it will fail at runtime, typically right after you diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html index ea2ae987c7e..92d8fce0490 100644 --- a/docs/streams/developer-guide/testing.html +++ b/docs/streams/developer-guide/testing.html @@ -86,10 +86,10 @@ builder.stream("input-topic").filter(...).to("output-topic"); Topology topology = builder.build(); // setup test driver -Properties config = new Properties(); -config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); -config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); -TopologyTestDriver testDriver = new TopologyTestDriver(topology, config); +Properties props = new Properties(); +props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); +props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); +TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);

    The test driver accepts ConsumerRecords with key and value type byte[]. @@ -171,12 +171,12 @@ public void setup() { topology.addSink("sinkProcessor", "result-topic", "aggregator"); // setup test driver - Properties config = new Properties(); - config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "maxAggregation"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); - config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); - testDriver = new TopologyTestDriver(topology, config); + Properties props = new Properties(); + props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "maxAggregation"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); + props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); + testDriver = new TopologyTestDriver(topology, props); // pre-populate store store = testDriver.getKeyValueStore("aggStore"); @@ -318,13 +318,13 @@ processorUnderTest.init(context); If you need to pass configuration to your processor or set the default serdes, you can create the mock with config:

    -final Properties config = new Properties();
    -config.put(StreamsConfig.APPLICATION_ID_CONFIG, "unit-test");
    -config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
    -config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    -config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
    -config.put("some.other.config", "some config value");
    -final MockProcessorContext context = new MockProcessorContext(config);
    +final Properties props = new Properties();
    +props.put(StreamsConfig.APPLICATION_ID_CONFIG, "unit-test");
    +props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
    +props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    +props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
    +props.put("some.other.config", "some config value");
    +final MockProcessorContext context = new MockProcessorContext(props);
                     

    Captured data diff --git a/docs/streams/developer-guide/write-streams.html b/docs/streams/developer-guide/write-streams.html index 0007b3e5ff8..145eb307a4f 100644 --- a/docs/streams/developer-guide/write-streams.html +++ b/docs/streams/developer-guide/write-streams.html @@ -119,11 +119,10 @@
  • The first argument of the KafkaStreams constructor takes a topology (either StreamsBuilder#build() for the DSL or Topology for the Processor API) that is used to define a topology.
  • -
  • The second argument is an instance of StreamsConfig, which defines the configuration for this specific topology.
  • +
  • The second argument is an instance of java.util.Properties, which defines the configuration for this specific topology.
  • Code example:

    import org.apache.kafka.streams.KafkaStreams;
    -import org.apache.kafka.streams.StreamsConfig;
     import org.apache.kafka.streams.kstream.StreamsBuilder;
     import org.apache.kafka.streams.processor.Topology;
     
    @@ -142,9 +141,9 @@
     // Use the configuration to tell your application where the Kafka cluster is,
     // which Serializers/Deserializers to use by default, to specify security settings,
     // and so on.
    -StreamsConfig config = ...;
    +Properties props = ...;
     
    -KafkaStreams streams = new KafkaStreams(topology, config);
    +KafkaStreams streams = new KafkaStreams(topology, props);
     

    At this point, internal structures are initialized, but the processing is not started yet. diff --git a/docs/streams/index.html b/docs/streams/index.html index 6dfaf6ba2a3..193a7b27989 100644 --- a/docs/streams/index.html +++ b/docs/streams/index.html @@ -172,11 +172,11 @@ public class WordCountApplication { public static void main(final String[] args) throws Exception { - Properties config = new Properties(); - config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); - config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); - config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream("TextLinesTopic"); @@ -186,7 +186,7 @@ .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")); wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long())); - KafkaStreams streams = new KafkaStreams(builder.build(), config); + KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } @@ -215,11 +215,11 @@ public class WordCountApplication { public static void main(final String[] args) throws Exception { - Properties config = new Properties(); - config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); - config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); - config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream("TextLinesTopic"); @@ -241,7 +241,7 @@ wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long())); - KafkaStreams streams = new KafkaStreams(builder.build(), config); + KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } @@ -263,7 +263,7 @@ import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} object WordCountApplication extends App { import Serdes._ - val config: Properties = { + val props: Properties = { val p = new Properties() p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application") p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092") @@ -278,7 +278,7 @@ object WordCountApplication extends App { .count(Materialized.as("counts-store")) wordCounts.toStream.to("WordsWithCountsTopic") - val streams: KafkaStreams = new KafkaStreams(builder.build(), config) + val streams: KafkaStreams = new KafkaStreams(builder.build(), props) streams.start() sys.ShutdownHookThread { diff --git a/docs/streams/tutorial.html b/docs/streams/tutorial.html index 21ff0307d8a..0006e3ef535 100644 --- a/docs/streams/tutorial.html +++ b/docs/streams/tutorial.html @@ -208,9 +208,7 @@

    Note that we can always describe the topology as we did above at any given point while we are building it in the code, so as a user you can interactively "try and taste" your computational logic defined in the topology until you are happy with it. Suppose we are already done with this simple topology that just pipes data from one Kafka topic to another in an endless streaming manner, - we can now construct the Streams client with the two components we have just constructed above: the configuration map and the topology object - (one can also construct a StreamsConfig object from the props map and then pass that object to the constructor, - KafkaStreams have overloaded constructor functions to takes either type). + we can now construct the Streams client with the two components we have just constructed above: the configuration map specified in a java.util.Properties instance and the Topology object.

    diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
    index e1093452e66..d6002ff016b 100644
    --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
    +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
    @@ -109,12 +109,11 @@ import static org.apache.kafka.common.utils.Utils.getPort;
      * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
      * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    - * StreamsConfig config = new StreamsConfig(props);
      *
      * StreamsBuilder builder = new StreamsBuilder();
      * builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");
      *
    - * KafkaStreams streams = new KafkaStreams(builder.build(), config);
    + * KafkaStreams streams = new KafkaStreams(builder.build(), props);
      * streams.start();
      * }
    *