diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index 8c45d0c7976..eb65846857d 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -1222,7 +1222,7 @@ streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksD These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. These optimizations will save on network traffic and storage in Kafka without changing the semantics of your applications. Enabling them is recommended.

- Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to StreamsConfig.OPTIMIZE, you'll need to pass in your + Note that you need to do two things to enable optimizations. In addition to setting this config to StreamsConfig.OPTIMIZE, you'll need to pass in your configuration properties when building your topology by using the overloaded StreamsBuilder.build(Properties) method. For example KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties).

@@ -1235,7 +1235,7 @@ streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksD The version you are upgrading from. It is important to set this config when performing a rolling upgrade to certain versions, as described in the upgrade guide. You should set this config to the appropriate version before bouncing your instances and upgrading them to the newer version. Once everyone is on the newer version, you should remove this config and do a second rolling bounce. It is only necessary to set this config and follow the two-bounce upgrade path - when upgrading from below version 2.0, or when upgrading to 2.4+ from any version lower than 2.4. + when upgrading to 3.4+ from any version lower than 3.4. diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 99c221cbbb5..ecd2a3bbfe9 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -35,9 +35,8 @@

Upgrading from any older version to {{fullDotVersion}} is possible: if upgrading from 3.4 or below, you will need to do two rolling bounces, where during the first rolling bounce phase you set the config upgrade.from="older version" - (possible values are "0.10.0" - "3.4") and during the second you remove it. This is required to safely handle 3 changes. The first is introduction of the new cooperative rebalancing protocol of the embedded consumer. The second is a change in foreign-key join serialization format. - Note that you will remain using the old eager rebalancing protocol if you skip or delay the second rolling bounce, but you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. For more details please refer to - KIP-429. The third is a change in the serialization format for an internal repartition topic. For more details, please refer to KIP-904: + (possible values are "2.4" - "3.4") and during the second you remove it. This is required to safely handle 2 changes. The first is a change in foreign-key join serialization format. + The second is a change in the serialization format for an internal repartition topic. For more details, please refer to KIP-904:

-

As an alternative, an offline upgrade is also possible. Upgrading from any versions as old as 0.10.0.x to {{fullDotVersion}} in offline mode require the following steps:

+

As an alternative, an offline upgrade is also possible. Upgrading from any versions as old as 0.11.0.x to {{fullDotVersion}} in offline mode require the following steps:

-

- Note: The cooperative rebalancing protocol has been the default since 2.4, but we have continued to support the - eager rebalancing protocol to provide users an upgrade path. This support will be dropped in a future release, - so any users still on the eager protocol should prepare to finish upgrading their applications to the cooperative protocol in version 3.1. - This only affects users who are still on a version older than 2.4, and users who have upgraded already but have not yet - removed the upgrade.from config that they set when upgrading from a version below 2.4. - Users fitting into the latter case will simply need to unset this config when upgrading beyond 3.1, - while users in the former case will need to follow a slightly different upgrade path if they attempt to upgrade from 2.3 or below to a version above 3.1. - Those applications will need to go through a bridge release, by first upgrading to a version between 2.4 - 3.1 and setting the upgrade.from config, - then removing that config and upgrading to the final version above 3.1. See KAFKA-8575 - for more details. -

For a table that shows Streams API compatibility with Kafka broker versions, see Broker Compatibility.

@@ -121,24 +108,6 @@

Since 2.6.0 release, Kafka Streams depends on a RocksDB version that requires MacOS 10.14 or higher.

-

- To run a Kafka Streams application version 2.2.1, 2.3.0, or higher a broker version 0.11.0 or higher is required - and the on-disk message format must be 0.11 or higher. - Brokers must be on version 0.10.1 or higher to run a Kafka Streams application version 0.10.1 to 2.2.0. - Additionally, on-disk message format must be 0.10 or higher to run a Kafka Streams application version 1.0 to 2.2.0. - For Kafka Streams 0.10.0, broker version 0.10.0 or higher is required. -

- -

- In deprecated KStreamBuilder class, when a KTable is created from a source topic via KStreamBuilder.table(), its materialized state store - will reuse the source topic as its changelog topic for restoring, and will disable logging to avoid appending new updates to the source topic; in the StreamsBuilder class introduced in 1.0, this behavior was changed - accidentally: we still reuse the source topic as the changelog topic for restoring, but will also create a separate changelog topic to append the update records from source topic to. In the 2.0 release, we have fixed this issue and now users - can choose whether or not to reuse the source topic based on the StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG: if you are upgrading from the old KStreamBuilder class and hence you need to change your code to use - the new StreamsBuilder, you should set this config value to StreamsConfig#OPTIMIZE to continue reusing the source topic; if you are upgrading from 1.0 or 1.1 where you are already using StreamsBuilder and hence have already - created a separate changelog topic, you should set this config value to StreamsConfig#NO_OPTIMIZATION when upgrading to {{fullDotVersion}} in order to use that changelog topic for restoring the state store. - More details about the new config StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG can be found in KIP-295. -

-

Streams API changes in 4.1.0

Early Access of the Streams Rebalance Protocol

@@ -1150,705 +1119,9 @@ Hence, this feature won't be supported in the future any longer and you need to updated your code accordingly. If you use a custom PartitionGrouper and stop to use it, the created tasks might change. Hence, you will need to reset your application to upgrade it. - - -

Streams API changes in 2.3.0

- -

Version 2.3.0 adds the Suppress operator to the kafka-streams-scala Ktable API.

- -

- As of 2.3.0 Streams now offers an in-memory version of the window (KIP-428) - and the session (KIP-445) store, in addition to the persistent ones based on RocksDB. - The new public interfaces inMemoryWindowStore() and inMemorySessionStore() are added to Stores and provide the built-in in-memory window or session store.

-

- As of 2.3.0 we've updated how to turn on optimizations. Now to enable optimizations, you need to do two things. - First add this line to your properties properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);, as you have done before. - Second, when constructing your KafkaStreams instance, you'll need to pass your configuration properties when building your - topology by using the overloaded StreamsBuilder.build(Properties) method. - For example KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties). -

- -

- In 2.3.0 we have added default implementation to close() and configure() for Serializer, - Deserializer and Serde so that they can be implemented by lambda expression. - For more details please read KIP-331. -

- -

- To improve operator semantics, new store types are added that allow storing an additional timestamp per key-value pair or window. - Some DSL operators (for example KTables) are using those new stores. - Hence, you can now retrieve the last update timestamp via Interactive Queries if you specify - TimestampedKeyValueStoreType or TimestampedWindowStoreType as your QueryableStoreType. - While this change is mainly transparent, there are some corner cases that may require code changes: - Caution: If you receive an untyped store and use a cast, you might need to update your code to cast to the correct type. - Otherwise, you might get an exception similar to - java.lang.ClassCastException: class org.apache.kafka.streams.state.ValueAndTimestamp cannot be cast to class YOUR-VALUE-TYPE - upon getting a value from the store. - Additionally, TopologyTestDriver#getStateStore() only returns non-built-in stores and throws an exception if a built-in store is accessed. - For more details please read KIP-258. -

- -

- To improve type safety, a new operator KStream#flatTransformValues is added. - For more details please read KIP-313. -

- -

- Kafka Streams used to set the configuration parameter max.poll.interval.ms to Integer.MAX_VALUE. - This default value is removed and Kafka Streams uses the consumer default value now. - For more details please read KIP-442. -

- -

- Default configuration for repartition topic was changed: - The segment size for index files (segment.index.bytes) is no longer 50MB, but uses the cluster default. - Similarly, the configuration segment.ms in no longer 10 minutes, but uses the cluster default configuration. - Lastly, the retention period (retention.ms) is changed from Long.MAX_VALUE to -1 (infinite). - For more details please read KIP-443. -

- -

- To avoid memory leaks, RocksDBConfigSetter has a new close() method that is called on shutdown. - Users should implement this method to release any memory used by RocksDB config objects, by closing those objects. - For more details please read KIP-453. -

- -

- RocksDB dependency was updated to version 5.18.3. - The new version allows to specify more RocksDB configurations, including WriteBufferManager which helps to limit RocksDB off-heap memory usage. - For more details please read KAFKA-8215. -

- -

Streams API changes in 2.2.0

-

- We've simplified the KafkaStreams#state transition diagram during the starting up phase a bit in 2.2.0: in older versions the state will transit from CREATED to RUNNING, and then to REBALANCING to get the first - stream task assignment, and then back to RUNNING; starting in 2.2.0 it will transit from CREATED directly to REBALANCING and then to RUNNING. - If you have registered a StateListener that captures state transition events, you may need to adjust your listener implementation accordingly for this simplification (in practice, your listener logic should be very unlikely to be affected at all). -

- -

- In WindowedSerdes, we've added a new static constructor to return a TimeWindowSerde with configurable window size. This is to help users to construct time window serdes to read directly from a time-windowed store's changelog. - More details can be found in KIP-393. -

- -

- In 2.2.0 we have extended a few public interfaces including KafkaStreams to extend AutoCloseable so that they can be - used in a try-with-resource statement. For a full list of public interfaces that get impacted please read KIP-376. -

- -

Streams API changes in 2.1.0

-

- We updated TopologyDescription API to allow for better runtime checking. - Users are encouraged to use #topicSet() and #topicPattern() accordingly on TopologyDescription.Source nodes, - instead of using #topics(), which has since been deprecated. Similarly, use #topic() and #topicNameExtractor() - to get descriptions of TopologyDescription.Sink nodes. For more details, see - KIP-321. -

- -

- We've added a new class Grouped and deprecated Serialized. The intent of adding Grouped is the ability to - name repartition topics created when performing aggregation operations. Users can name the potential repartition topic using the - Grouped#as() method which takes a String and is used as part of the repartition topic name. The resulting repartition - topic name will still follow the pattern of ${application-id}->name<-repartition. The Grouped class is now favored over - Serialized in KStream#groupByKey(), KStream#groupBy(), and KTable#groupBy(). - Note that Kafka Streams does not automatically create repartition topics for aggregation operations. - - Additionally, we've updated the Joined class with a new method Joined#withName - enabling users to name any repartition topics required for performing Stream/Stream or Stream/Table join. For more details repartition - topic naming, see KIP-372. - - As a result we've updated the Kafka Streams Scala API and removed the Serialized class in favor of adding Grouped. - If you just rely on the implicit Serialized, you just need to recompile; if you pass in Serialized explicitly, sorry you'll have to make code changes. -

- -

- We've added a new config named max.task.idle.ms to allow users specify how to handle out-of-order data within a task that may be processing multiple - topic-partitions (see Out-of-Order Handling section for more details). - The default value is set to 0, to favor minimized latency over synchronization between multiple input streams from topic-partitions. - If users would like to wait for longer time when some of the topic-partitions do not have data available to process and hence cannot determine its corresponding stream time, - they can override this config to a larger value. -

- -

- We've added the missing SessionBytesStoreSupplier#retentionPeriod() to be consistent with the WindowBytesStoreSupplier which allows users to get the specified retention period for session-windowed stores. - We've also added the missing StoreBuilder#withCachingDisabled() to allow users to turn off caching for their customized stores. -

- -

- We added a new serde for UUIDs (Serdes.UUIDSerde) that you can use via Serdes.UUID() - (cf. KIP-206). -

- -

- We updated a list of methods that take long arguments as either timestamp (fix point) or duration (time period) - and replaced them with Instant and Duration parameters for improved semantics. - Some old methods base on long are deprecated and users are encouraged to update their code. -
- In particular, aggregation windows (hopping/tumbling/unlimited time windows and session windows) as well as join windows now take Duration - arguments to specify window size, hop, and gap parameters. - Also, window sizes and retention times are now specified as Duration type in Stores class. - The Window class has new methods #startTime() and #endTime() that return window start/end timestamp as Instant. - For interactive queries, there are new #fetch(...) overloads taking Instant arguments. - Additionally, punctuations are now registered via ProcessorContext#schedule(Duration interval, ...). - For more details, see KIP-358. -

- -

- We deprecated KafkaStreams#close(...) and replaced it with KafkaStreams#close(Duration) that accepts a single timeout argument - Note: the new #close(Duration) method has improved (but slightly different) semantics. - For more details, see KIP-358. -

- -

- The newly exposed AdminClient metrics are now available when calling the KafkaStream#metrics() method. - For more details on exposing AdminClients metrics - see KIP-324 -

- -

- We deprecated the notion of segments in window stores as those are intended to be an implementation details. - Thus, method Windows#segments() and variable Windows#segments were deprecated. - If you implement custom windows, you should update your code accordingly. - Similarly, WindowBytesStoreSupplier#segments() was deprecated and replaced with WindowBytesStoreSupplier#segmentInterval(). - If you implement custom window store, you need to update your code accordingly. - Finally, Stores#persistentWindowStore(...) were deprecated and replaced with a new overload that does not allow to specify the number of segments any longer. - For more details, see KIP-319 - (note: KIP-328 and - KIP-358 'overlap' with KIP-319). -

- -

- We've added an overloaded StreamsBuilder#build method that accepts an instance of java.util.Properties with the intent of using the - StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG config added in Kafka Streams 2.0. Before 2.1, when building a topology with - the DSL, Kafka Streams writes the physical plan as the user makes calls on the DSL. Now by providing a java.util.Properties instance when - executing a StreamsBuilder#build call, Kafka Streams can optimize the physical plan of the topology, provided the StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG - config is set to StreamsConfig#OPTIMIZE. By setting StreamsConfig#OPTIMIZE in addition to the KTable optimization of - reusing the source topic as the changelog topic, the topology may be optimized to merge redundant repartition topics into one - repartition topic. The original no parameter version of StreamsBuilder#build is still available for those who wish to not - optimize their topology. Note that enabling optimization of the topology may require you to do an application reset when redeploying the application. For more - details, see KIP-312 -

- -

- We are introducing static membership towards Kafka Streams user. This feature reduces unnecessary rebalances during normal application upgrades or rolling bounces. - For more details on how to use it, checkout static membership design. - Note, Kafka Streams uses the same ConsumerConfig#GROUP_INSTANCE_ID_CONFIG, and you only need to make sure it is uniquely defined across - different stream instances in one application. -

- -

Streams API changes in 2.0.0

-

- In 2.0.0 we have added a few new APIs on the ReadOnlyWindowStore interface (for details please read Streams API changes below). - If you have customized window store implementations that extends the ReadOnlyWindowStore interface you need to make code changes. -

- -

- In addition, if you using Java 8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities. - Hot-swapping the jar-file only might not work for this case. - See below a complete list of 2.0.0 - API and semantic changes that allow you to advance your application and/or simplify your code base. -

- -

- We moved Consumed interface from org.apache.kafka.streams to org.apache.kafka.streams.kstream - as it was mistakenly placed in the previous release. If your code has already used it there is a simple one-liner change needed in your import statement. -

- -

- We have also removed some public APIs that are deprecated prior to 1.0.x in 2.0.0. - See below for a detailed list of removed APIs. -

-

- We have removed the skippedDueToDeserializationError-rate and skippedDueToDeserializationError-total metrics. - Deserialization errors, and all other causes of record skipping, are now accounted for in the pre-existing metrics - skipped-records-rate and skipped-records-total. When a record is skipped, the event is - now logged at WARN level. If these warnings become burdensome, we recommend explicitly filtering out unprocessable - records instead of depending on record skipping semantics. For more details, see - KIP-274. - As of right now, the potential causes of skipped records are: -

- - -

- We've also fixed the metrics name for time and session windowed store operations in 2.0. As a result, our current built-in stores - will have their store types in the metric names as in-memory-state, in-memory-lru-state, - rocksdb-state, rocksdb-window-state, and rocksdb-session-state. For example, a RocksDB time windowed store's - put operation metrics would now be - kafka.streams:type=stream-rocksdb-window-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),rocksdb-window-state-id=([-.\w]+). - Users need to update their metrics collecting and reporting systems for their time and session windowed stores accordingly. - For more details, please read the State Store Metrics section. -

- -

- We have added support for methods in ReadOnlyWindowStore which allows for querying a single window's key-value pair. - For users who have customized window store implementations on the above interface, they'd need to update their code to implement the newly added method as well. - For more details, see KIP-261. -

-

- We have added public WindowedSerdes to allow users to read from / write to a topic storing windowed table changelogs directly. - In addition, in StreamsConfig we have also added default.windowed.key.serde.inner and default.windowed.value.serde.inner - to let users specify inner serdes if the default serde classes are windowed serdes. - For more details, see KIP-265. -

-

- We've added message header support in the Processor API in Kafka 2.0.0. In particular, we have added a new API ProcessorContext#headers() - which returns a Headers object that keeps track of the headers of the source topic's message that is being processed. Through this object, users can manipulate - the headers map that is being propagated throughout the processor topology as well. For more details please feel free to read - the Developer Guide section. -

-

- We have deprecated constructors of KafkaStreams that take a StreamsConfig as parameter. - Please use the other corresponding constructors that accept java.util.Properties instead. - For more details, see KIP-245. -

-

- Kafka 2.0.0 allows to manipulate timestamps of output records using the Processor API (KIP-251). - To enable this new feature, ProcessorContext#forward(...) was modified. - The two existing overloads #forward(Object key, Object value, String childName) and #forward(Object key, Object value, int childIndex) were deprecated and a new overload #forward(Object key, Object value, To to) was added. - The new class To allows you to send records to all or specific downstream processors by name and to set the timestamp for the output record. - Forwarding based on child index is not supported in the new API any longer. -

-

- We have added support to allow routing records dynamically to Kafka topics. More specifically, in both the lower-level Topology#addSink and higher-level KStream#to APIs, we have added variants that - take a TopicNameExtractor instance instead of a specific String typed topic name, such that for each received record from the upstream processor, the library will dynamically determine which Kafka topic to write to - based on the record's key and value, as well as record context. Note that all the Kafka topics that may possibly be used are still considered as user topics and hence required to be pre-created. In addition to that, we have modified the - StreamPartitioner interface to add the topic name parameter since the topic name now may not be known beforehand; users who have customized implementations of this interface would need to update their code while upgrading their application - to use Kafka Streams 2.0.0. -

-

- KIP-284 changed the retention time for repartition topics by setting its default value to Long.MAX_VALUE. - Instead of relying on data retention Kafka Streams uses the new purge data API to delete consumed data from those topics and to keep used storage small now. -

-

- We have modified the ProcessorStateManger#register(...) signature and removed the deprecated loggingEnabled boolean parameter as it is specified in the StoreBuilder. - Users who used this function to register their state stores into the processor topology need to simply update their code and remove this parameter from the caller. -

-

- Kafka Streams DSL for Scala is a new Kafka Streams client library available for developers authoring Kafka Streams applications in Scala. It wraps core Kafka Streams DSL types to make it easier to call when - interoperating with Scala code. For example, it includes higher order functions as parameters for transformations avoiding the need anonymous classes in Java 7 or experimental SAM type conversions in Scala 2.11, - automatic conversion between Java and Scala collection types, a way - to implicitly provide Serdes to reduce boilerplate from your application and make it more typesafe, and more! For more information see the - Kafka Streams DSL for Scala documentation and - KIP-270. -

-

- We have removed these deprecated APIs: -

- - -

Streams API changes in 1.1.0

-

- We have added support for methods in ReadOnlyWindowStore which allows for querying WindowStores without the necessity of providing keys. - For users who have customized window store implementations on the above interface, they'd need to update their code to implement the newly added method as well. - For more details, see KIP-205. -

- -

- There is a new artifact kafka-streams-test-utils providing a TopologyTestDriver, ConsumerRecordFactory, and OutputVerifier class. - You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application. - For more details, see KIP-247. -

- -

- The introduction of KIP-220 - enables you to provide configuration parameters for the embedded admin client created by Kafka Streams, similar to the embedded producer and consumer clients. - You can provide the configs via StreamsConfig by adding the configs with the prefix admin. as defined by StreamsConfig#adminClientPrefix(String) - to distinguish them from configurations of other clients that share the same config names. -

- -

- New method in KTable -

- - -

- New method in GlobalKTable -

- - -

- New methods in KafkaStreams: -

- - -

New methods in KafkaClientSupplier:

- - -

New error handling for exceptions during production:

- - -

Changes in StreamsResetter:

- - -

Streams API changes in 1.0.0

- -

- With 1.0 a major API refactoring was accomplished and the new API is cleaner and easier to use. - This change includes the five main classes KafkaStreams, KStreamBuilder, - KStream, KTable, and TopologyBuilder (and some more others). - All changes are fully backward compatible as old API is only deprecated but not removed. - We recommend to move to the new API as soon as you can. - We will summarize all API changes in the next paragraphs. -

- -

- The two main classes to specify a topology via the DSL (KStreamBuilder) - or the Processor API (TopologyBuilder) were deprecated and replaced by - StreamsBuilder and Topology (both new classes are located in - package org.apache.kafka.streams). - Note, that StreamsBuilder does not extend Topology, i.e., - the class hierarchy is different now. - The new classes have basically the same methods as the old ones to build a topology via DSL or Processor API. - However, some internal methods that were public in KStreamBuilder - and TopologyBuilder but not part of the actual API are not present - in the new classes any longer. - Furthermore, some overloads were simplified compared to the original classes. - See KIP-120 - and KIP-182 - for full details. -

- -

- Changing how a topology is specified also affects KafkaStreams constructors, - that now only accept a Topology. - Using the DSL builder class StreamsBuilder one can get the constructed - Topology via StreamsBuilder#build(). - Additionally, a new class org.apache.kafka.streams.TopologyDescription - (and some more dependent classes) were added. - Those can be used to get a detailed description of the specified topology - and can be obtained by calling Topology#describe(). - An example using this new API is shown in the quickstart section. -

- -

- New methods in KStream: -

- - -

- New methods in KafkaStreams: -

- - -

- Deprecated / modified methods in KafkaStreams: -

- - -

- Deprecated methods in KGroupedStream -

- - -

- Modified methods in Processor: -

- - -

- If you are monitoring on task level or processor-node / state store level Streams metrics, please note that the metrics sensor name and hierarchy was changed: - The task ids, store names and processor names are no longer in the sensor metrics names, but instead are added as tags of the sensors to achieve consistent metrics hierarchy. - As a result you may need to make corresponding code changes on your metrics reporting and monitoring tools when upgrading to 1.0.0. - Detailed metrics sensor can be found in the Streams Monitoring section. -

- -

- The introduction of KIP-161 - enables you to provide a default exception handler for deserialization errors when reading data from Kafka rather than throwing the exception all the way out of your streams application. - You can provide the configs via the StreamsConfig as StreamsConfig#DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG. - The specified handler must implement the org.apache.kafka.streams.errors.DeserializationExceptionHandler interface. -

- -

- The introduction of KIP-173 - enables you to provide topic configuration parameters for any topics created by Kafka Streams. - This includes repartition and changelog topics. - You can provide the configs via the StreamsConfig by adding the configs with the prefix as defined by StreamsConfig#topicPrefix(String). - Any properties in the StreamsConfig with the prefix will be applied when creating internal topics. - Any configs that aren't topic configs will be ignored. - If you already use StateStoreSupplier or Materialized to provide configs for changelogs, then they will take precedence over those supplied in the config. -

- -

Streams API changes in 0.11.0.0

- -

Updates in StreamsConfig:

- - -

New methods in TopologyBuilder:

- - -

New methods in KStreamBuilder:

- - -

Deprecated methods in KTable:

- - -

- The above methods have been deprecated in favor of using the Interactive Queries API. - If you want to query the current content of the state store backing the KTable, use the following approach: -

- -

- If you want to view the changelog stream of the KTable then you could call KTable.toStream().print(Printed.toSysOut). -

- -

Metrics using exactly-once semantics:

-

- If "exactly_once" processing (EOS version 1) is enabled via the processing.guarantee parameter, - internally Streams switches from a producer-per-thread to a producer-per-task runtime model. - Using "exactly_once_beta" (EOS version 2) does use a producer-per-thread, so client.id doesn't change, - compared with "at_least_once" for this case). - In order to distinguish the different producers, the producer's client.id additionally encodes the task-ID for this case. - Because the producer's client.id is used to report JMX metrics, it might be required to update tools that receive those metrics. -

- -

Producer's client.id naming schema:

- -

[client.Id] is either set via Streams configuration parameter client.id or defaults to [application.id]-[processId] ([processId] is a random UUID).

- -

Notable changes in 0.10.2.1

- -

- Parameter updates in StreamsConfig: -

- - -

Streams API changes in 0.10.2.0

- -

- New methods in KafkaStreams: -

- - -

- Parameter updates in StreamsConfig: -

- - -

Changes in StreamsMetrics interface:

- - -

New methods in TopologyBuilder:

- - -

New methods in KStreamBuilder:

- - -

New joins for KStream:

- - -

Aligned null-key handling for KTable joins:

- - -

New window type Session Windows:

- - -

Changes to TimestampExtractor:

- - -

Relaxed type constraints of many DSL interfaces, classes, and methods (cf. KIP-100).

- -

Streams API changes in 0.10.1.0

- -

Stream grouping and aggregation split into two methods:

- - -

Auto Repartitioning:

- - -

TopologyBuilder:

- - -

DSL: new parameter to specify state store names:

- - -

Windowing:

- +

For Streams API changes in version older than 2.4.x, please check 3.9 upgrade document.

Streams API broker compatibility

@@ -1864,7 +1137,7 @@ Kafka Streams API (rows) - 2.1.x and
2.2.x and
2.3.x and
2.4.x and
2.5.x and
2.6.x and
2.7.x and
2.8.x and
3.0.x and
3.1.x and
3.2.x and
3.3.x and
3.4.x and
3.5.x and
3.6.x and
3.7.x and
3.8.x and
3.9.x and
4.0.x + 2.4.x and
2.5.x and
2.6.x and
2.7.x and
2.8.x and
3.0.x and
3.1.x and
3.2.x and
3.3.x and
3.4.x and
3.5.x and
3.6.x and
3.7.x and
3.8.x and
3.9.x and
4.0.x 4.1.x diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 2050db40443..4830fb960b9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -287,66 +287,6 @@ public class StreamsConfig extends AbstractConfig { OPTIMIZE, NO_OPTIMIZATION, REUSE_KTABLE_SOURCE_TOPICS, MERGE_REPARTITION_TOPICS, SINGLE_STORE_SELF_JOIN); - /** - * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}. - */ - @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_0100 = UpgradeFromValues.UPGRADE_FROM_0100.toString(); - - /** - * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.1.x}. - */ - @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_0101 = UpgradeFromValues.UPGRADE_FROM_0101.toString(); - - /** - * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.2.x}. - */ - @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_0102 = UpgradeFromValues.UPGRADE_FROM_0102.toString(); - - /** - * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.11.0.x}. - */ - @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_0110 = UpgradeFromValues.UPGRADE_FROM_0110.toString(); - - /** - * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.0.x}. - */ - @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_10 = UpgradeFromValues.UPGRADE_FROM_10.toString(); - - /** - * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.1.x}. - */ - @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_11 = UpgradeFromValues.UPGRADE_FROM_11.toString(); - - /** - * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.0.x}. - */ - @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_20 = UpgradeFromValues.UPGRADE_FROM_20.toString(); - - /** - * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.1.x}. - */ - @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_21 = UpgradeFromValues.UPGRADE_FROM_21.toString(); - - /** - * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.2.x}. - */ - @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_22 = UpgradeFromValues.UPGRADE_FROM_22.toString(); - - /** - * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.3.x}. - */ - @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_23 = UpgradeFromValues.UPGRADE_FROM_23.toString(); - /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.4.x}. */ diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java b/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java index 798383980b5..312ef0622af 100644 --- a/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java @@ -17,16 +17,6 @@ package org.apache.kafka.streams.internals; public enum UpgradeFromValues { - UPGRADE_FROM_0100("0.10.0"), - UPGRADE_FROM_0101("0.10.1"), - UPGRADE_FROM_0102("0.10.2"), - UPGRADE_FROM_0110("0.11.0"), - UPGRADE_FROM_10("1.0"), - UPGRADE_FROM_11("1.1"), - UPGRADE_FROM_20("2.0"), - UPGRADE_FROM_21("2.1"), - UPGRADE_FROM_22("2.2"), - UPGRADE_FROM_23("2.3"), UPGRADE_FROM_24("2.4"), UPGRADE_FROM_25("2.5"), UPGRADE_FROM_26("2.6"), diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java index 4964c707d9b..219115c3b93 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java @@ -58,16 +58,6 @@ public class ChangedSerializer implements Serializer>, WrappingNull } switch (UpgradeFromValues.fromString((String) upgradeFrom)) { - case UPGRADE_FROM_0100: - case UPGRADE_FROM_0101: - case UPGRADE_FROM_0102: - case UPGRADE_FROM_0110: - case UPGRADE_FROM_10: - case UPGRADE_FROM_11: - case UPGRADE_FROM_20: - case UPGRADE_FROM_21: - case UPGRADE_FROM_22: - case UPGRADE_FROM_23: case UPGRADE_FROM_24: case UPGRADE_FROM_25: case UPGRADE_FROM_26: diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java index a686692b40a..40bd37c0f60 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java @@ -100,16 +100,6 @@ public class KTableRepartitionMap implements KTableRepartitionMapS } switch (UpgradeFromValues.fromString((String) upgradeFrom)) { - case UPGRADE_FROM_0100: - case UPGRADE_FROM_0101: - case UPGRADE_FROM_0102: - case UPGRADE_FROM_0110: - case UPGRADE_FROM_10: - case UPGRADE_FROM_11: - case UPGRADE_FROM_20: - case UPGRADE_FROM_21: - case UPGRADE_FROM_22: - case UPGRADE_FROM_23: case UPGRADE_FROM_24: case UPGRADE_FROM_25: case UPGRADE_FROM_26: diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java index b03b24749e0..064d464d8be 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java @@ -77,16 +77,6 @@ public class SubscriptionWrapperSerde extends WrappingNullableSerde parameterizedConfig) { - setUp(parameterizedConfig, false); - builder.addSource(null, "source1", null, null, null, "topic1"); - builder.addSource(null, "source2", null, null, null, "topic2"); - builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1", "source2"); - - final Set prevTasks = Set.of( - new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1) - ); - final Set standbyTasks = Set.of( - new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2) - ); - - createMockTaskManager(prevTasks, standbyTasks); - assertThrows( - ConfigException.class, - () -> configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_23), parameterizedConfig) - ); - } - @ParameterizedTest @MethodSource("parameter") public void testCooperativeSubscription(final Map parameterizedConfig) {