2017-12-22 03:15:54 +08:00
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
< script > <!-- # include virtual = "../../js/templateData.js" -- > < / script >
< script id = "content-template" type = "text/x-handlebars-template" >
<!-- h1>Developer Guide for Kafka Streams</h1 -->
< div class = "sub-nav-sticky" >
< div class = "sticky-top" >
<!-- div style="height:35px">
< a href = "/{{version}}/documentation/streams/" > Introduction< / a >
< a class = "active-menu-item" href = "/{{version}}/documentation/streams/developer-guide" > Developer Guide< / a >
< a href = "/{{version}}/documentation/streams/core-concepts" > Concepts< / a >
< a href = "/{{version}}/documentation/streams/quickstart" > Run Demo App< / a >
< a href = "/{{version}}/documentation/streams/tutorial" > Tutorial: Write App< / a >
< /div -->
< / div >
< / div >
< div class = "section" id = "processor-api" >
< span id = "streams-developer-guide-processor-api" > < / span > < h1 > Processor API< a class = "headerlink" href = "#processor-api" title = "Permalink to this headline" > < / a > < / h1 >
< p > The Processor API allows developers to define and connect custom processors and to interact with state stores. With the
Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these
processors with their associated state stores to compose the processor topology that represents a customized processing
logic.< / p >
< div class = "contents local topic" id = "table-of-contents" >
2018-01-09 03:43:38 +08:00
< p class = "topic-title first" > < b > Table of Contents< / b > < / p >
2017-12-22 03:15:54 +08:00
< ul class = "simple" >
< li > < a class = "reference internal" href = "#overview" id = "id1" > Overview< / a > < / li >
2018-03-28 05:03:24 +08:00
< li > < a class = "reference internal" href = "#defining-a-stream-processor" id = "id2" > Defining a Stream
Processor< / a > < / li >
< li > < a class = "reference internal" href = "#unit-testing-processors" id = "id9" > Unit Testing Processors< / a > < / li >
< li > < a class = "reference internal" href = "#state-stores" id = "id3" > State Stores< / a >
< ul >
< li > < a class = "reference internal" href = "#defining-and-creating-a-state-store" id = "id4" > Defining and creating a State Store< / a > < / li >
< li > < a class = "reference internal" href = "#fault-tolerant-state-stores" id = "id5" > Fault-tolerant State Stores< / a > < / li >
< li > < a class = "reference internal" href = "#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" id = "id6" > Enable or Disable Fault Tolerance of State Stores (Store Changelogs)< / a > < / li >
2020-07-07 09:09:24 +08:00
< li > < a class = "reference internal" href = "#timestamped-state-stores" id = "id11" > Timestamped State Stores< / a > < / li >
2023-04-13 02:31:27 +08:00
< li > < a class = "reference internal" href = "#versioned-state-stores" id = "id12" > Versioned Key-Value State Stores< / a > < / li >
feat: merge apache kafka trunk (#1030)
* KAFKA-16305: Avoid optimisation in handshakeUnwrap (#15434)
Performs additional unwrap during handshake after data from client is processed to support openssl, which needs the extra unwrap to complete handshake.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>
* KAFKA-16116: Rebalance Metrics for AsyncKafkaConsumer (#15339)
Adding the following rebalance metrics to the consumer:
rebalance-latency-avg
rebalance-latency-max
rebalance-latency-total
rebalance-rate-per-hour
rebalance-total
failed-rebalance-rate-per-hour
failed-rebalance-total
Due to the difference in protocol, we need to redefine when rebalance starts and ends.
Start of Rebalance:
Current: Right before sending out JoinGroup
ConsumerGroup: When the client receives assignments from the HB
End of Rebalance - Successful Case:
Current: Receiving SyncGroup request after transitioning to "COMPLETING_REBALANCE"
ConsumerGroup: After completing reconciliation and right before sending out "Ack" heartbeat
End of Rebalance - Failed Case:
Current: Any failure in the JoinGroup/SyncGroup response
ConsumerGroup: Failure in the heartbeat
Note: Afterall, we try to be consistent with the current protocol. Rebalances start and end with sending and receiving network requests. Failures in network requests signify the user failures in rebalance. And it is entirely possible to have multiple failures before having a successful one.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* MINOR: Optimize EventAccumulator (#15430)
`poll(long timeout, TimeUnit unit)` is either used with `Long.MAX_VALUE` or `0`. This patch replaces it with `poll` and `take`. It removes the `awaitNanos` usage.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
* MINOR: Remove the space between two words (#15439)
Remove the space between two words
Reviewers: Luke Chen <showuon@gmail.com>
* KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP (#15213)
This is the first part of the implementation of KIP-1005
The purpose of this pull request is for the broker to start returning the correct offset when it receives a -5 as a timestamp in a ListOffsets API request
Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org>
* KAFKA-15462: Add Group Type Filter for List Group to the Admin Client (#15150)
In KIP-848, we introduce the notion of Group Types based on the protocol type that the members in the consumer group use. As of now we support two types of groups:
* Classic : Members use the classic consumer group protocol ( existing one )
* Consumer : Members use the consumer group protocol introduced in KIP-848.
Currently List Groups allows users to list all the consumer groups available. KIP-518 introduced filtering the consumer groups by the state that they are in. We now want to allow users to filter consumer groups by type.
This patch includes the changes to the admin client and related files. It also includes changes to parameterize the tests to include permutations of the old GC and the new GC with the different protocol types.
Reviewers: David Jacot <djacot@confluent.io>
* KAFKA-16191: Clean up of consumer client internal events (#15438)
There are a few minor issues with the event sub-classes in the
org.apache.kafka.clients.consumer.internals.events package that should be cleaned up:
- Update the names of subclasses to remove "Application" or "Background"
- Make toString() final in the base classes and clean up the implementations of toStringBase()
- Fix minor whitespace inconsistencies
- Make variable/method names consistent
Reviewer: Bruno Cadonna <cadonna@apache.org>
* MINOR: Fix UpdatedImage and HighWatermarkUpdated events' logs (#15432)
I have noticed the following log when a __consumer_offsets partition immigrate from a broker. It appends because the event is queued up after the event that unloads the state machine. This patch fixes it and fixes another similar one.
```
[2024-02-06 17:14:51,359] ERROR [GroupCoordinator id=1] Execution of UpdateImage(tp=__consumer_offsets-28, offset=13251) failed due to This is not the correct coordinator.. (org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime)
org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.
```
Reviewers: Justine Olshan <jolshan@confluent.io>
* KAFKA-16167: Disable wakeups during autocommit on close (#15445)
When the consumer is closed, we perform a sychronous autocommit. We don't want to be woken up here, because we are already executing a close operation under a deadline. This is in line with the behavior of the old consumer.
This fixes PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup which is flaky on trunk - because we return immediately from the synchronous commit with a WakeupException, which causes us to not wait for the commit to finish and thereby sometimes miss the committed offset when a new consumer is created.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <cadonna@apache.org>
* KAFKA-16261: updateSubscription fails if already empty subscription (#15440)
The internal SubscriptionState object keeps track of whether the assignment is user-assigned, or auto-assigned. If there are no assigned partitions, the assignment resets to NONE. If you call SubscriptionState.assignFromSubscribed in this state it fails.
This change makes sure to check SubscriptionState.hasAutoAssignedPartitions() so that assignFromSubscribed is going to be permitted.
Also, a minor refactoring to make clearing the subscription a bit easier to follow in MembershipManagerImpl.
Testing via new unit test.
Reviewers: Bruno Cadonna <cadonna@apache.org>, Andrew Schofield <aschofield@confluent.io>
* KAFKA-15878: KIP-768 - Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER (#14818)
# Overview
* This change pertains to [SASL/OAUTHBEARER ](https://kafka.apache.org/documentation/#security_sasl_oauthbearer) mechanism of Kafka authentication.
* Kafka clients can use [SASL/OAUTHBEARER ](https://kafka.apache.org/documentation/#security_sasl_oauthbearer) mechanism by overriding the [custom call back handlers](https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod) .
* [KIP-768](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575) available from v3.1 further extends the mechanism with a production grade implementation.
* Kafka's [SASL/OAUTHBEARER ](https://kafka.apache.org/documentation/#security_sasl_oauthbearer) mechanism currently **rejects the non-JWT (i.e. opaque) tokens**. This is because of a more restrictive set of characters than what [RFC-6750](https://datatracker.ietf.org/doc/html/rfc6750#section-2.1) recommends.
* This JIRA can be considered an extension of [KIP-768](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575) to support the opaque tokens as well apart from the JWT tokens.
# Solution
* Have updated the regex in the the offending class to be compliant with the [RFC-6750](https://datatracker.ietf.org/doc/html/rfc6750#section-2.1)
* Have provided a supporting test case that includes the possible character set defined in [RFC-6750](https://datatracker.ietf.org/doc/html/rfc6750#section-2.1)
---------
Co-authored-by: Anuj Sharma <philomath.anuj@gmail.com>
Co-authored-by: Jamie Holmes <jamie.holmes@tesco.com>
Co-authored-by: Christopher Webb <31657038+cwebbtw@users.noreply.github.com>
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Kirk True <ktrue@confluent.io>
* MINOR: Upgrade jqwik to version 1.8.3 (#14365)
This minor pull request consist of upgrading version of jqwik library to version 1.8.0 that brings some bug fixing and some enhancements, upgrading the version now will make future upgrades easier
For breaking changes:
We are not using ArbitraryConfiguratorBase, so there is no overriding of configure method
We are not using TypeUsage.canBeAssignedTo(TypeUsage)
No breaking is related to @Provide and @ForAll usage no Exception CannotFindArbitraryException is thrown during tests running
No usage of StringArbitrary.repeatChars(0.0)
We are not affected by the removal of method TypeArbitrary.use(Executable)
We are not affected by the removal or methods ActionChainArbitrary.addAction(action) and ActionChainArbitrary.addAction(weight, action)
For more details check the release notes: https://jqwik.net/release-notes.html#180
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Yash Mayya <yash.mayya@gmail.com>
* MINOR: fix link for ListTransactionsOptions#filterOnDuration (#15459)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: fix SessionStore java doc (#15412)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: Remove unnecessary easymock/powermock dependencies (#15460)
These projects don't actually use easymock/powermock.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-15625: Do not flush global state store at each commit (#15361)
Global state stores are currently flushed at each commit, which may impact performance, especially for EOS (commit each 200ms).
The goal of this improvement is to flush global state stores only when the delta between the current offset and the last checkpointed offset exceeds a threshold.
This is the same logic we apply on local state store, with a threshold of 10000 records.
The implementation only flushes if the time interval elapsed and the threshold of 10000 records is exceeded.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Bruno Cadonna <cadonna@apache.org>
* MINOR: Updating comments to match the code (#15388)
This comment was added by #12862
The method with the comment was originally named updateLastSend, but its name was later changed to onSendAttempt.
This method doesn't increment numAttempts.
It seems that the numAttempts is only modified after a Request succeeds or fails.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16285: Make group metadata available when a new assignment is set (#15426)
Currently, in the async Kafka consumer updates to the group metadata
that are received by the heartbeat are propagated to the application thread
in form of an event. Group metadata is updated when a new assignment is
received. The new assignment is directly set in the subscription without
sending an update event from the background thread to the application thread.
That means that there might be a delay between the application thread being
aware of the update to the assignment and the application thread being
aware of the update to the group metadata. This delay can cause stale
group metadata returned by the application thread that then causes
issues when data of the new assignment is committed. A concrete
example is
producer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata)
The offsets to commit might already stem from the new assignment
but the group metadata might relate to the previous assignment.
Reviewers: Kirk True <ktrue@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
* MINOR: simplify ensure topic exists condition (#15458)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-14747: record discarded FK join subscription responses (#15395)
A foreign-key-join might drop a "subscription response" message, if the value-hash changed.
This PR adds support to record such event via the existing "dropped records" sensor.
Reviewers: Matthias J. Sax <matthias@confluent.io>
* KAFKA-16288, KAFKA-16289: Fix Values convertToDecimal exception and parseString corruption (#15399)
* KAFKA-16288: Prevent ClassCastExceptions for strings in Values.convertToDecimal
* KAFKA-16289: Values inferred schemas for map and arrays should ignore element order
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Reviewers: Chris Egerton <chrise@aiven.io>
* KAFKA-16169: FencedException in commitAsync not propagated without callback (#15437)
The javadocs for commitAsync() (w/o callback) say:
@throws org.apache.kafka.common.errors.FencedInstanceIdException
if this consumer instance gets fenced by broker.
If no callback is passed into commitAsync(), no offset commit callback invocation is submitted. However, we only check for a FencedInstanceIdException when we execute a callback. When the consumer gets fenced by another consumer with the same group.instance.id, and we do not use a callback, we miss the exception.
This change modifies the behavior to propagate the FencedInstanceIdException even if no callback is used. The code is kept very similar to the original consumer.
We also change the order - first try to throw the fenced exception, then execute callbacks. That is the order in the original consumer so it's safer to keep it this way.
For testing, we add a unit test that verifies that the FencedInstanceIdException is thrown in that case.
Reviewers: Philip Nee <pnee@confluent.io>, Matthias J. Sax <matthias@confluent.io>
* KAFKA-14588 Log cleaner configuration move to CleanerConfig (#15387)
In order to move ConfigCommand to tools we must move all it's dependencies which includes KafkaConfig and other core classes to java. This PR moves log cleaner configuration to CleanerConfig class of storage module.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: parameterize group-id in GroupMetadataManagerTestContext (#15467)
This pr parameterize some group ids in GroupMetadataManagerTestContext that are now constant strings.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: remove test constructor for PartitionAssignment (#15435)
Remove the test constructor for PartitionAssignment and remove the TODO.
Also add KRaftClusterTest.testCreatePartitions to get more coverage for
createPartitions.
Reviewers: David Arthur <mumrah@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: Remove controlPlaneRequestProcessor in BrokerServer (#15245)
It seems likely that BrokerServer was built upon the KafkaServer codebase.(#10113)
KafkaServer, using Zookeeper, separates controlPlane and dataPlane to implement KIP-291.
In KRaft, the roles of DataPlane and ControlPlane in KafkaServer seem to be divided into BrokerServer and ControllerServer.
It appears that the initial implementation of BrokerServer initialized and used the controlPlaneRequestProcessor, but it seems to have been removed, except for the code used in the shutdown method, through subsequent modifications.(#10931)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 (#15444)
Change the function with a better way to deal with the NULL pointer exception.
Reviewers: Luke Chen <showuon@gmail.com>
* KAFKA-15417: flip joinSpuriousLookBackTimeMs and emit non-joined items (#14426)
Kafka Streams support asymmetric join windows. Depending on the window configuration
we need to compute window close time etc differently.
This PR flips `joinSpuriousLookBackTimeMs`, because they were not correct, and
introduced the `windowsAfterIntervalMs`-field that is used to find if emitting records can be skipped.
Reviewers: Hao Li <hli@confluent.io>, Guozhang Wang <guozhang.wang.us@gmail.com>, Matthias J. Sax <matthias@confluent.io>
* KAFKA-16347: Upgrade zookeeper 3.8.3 -> 3.8.4 (#15480)
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-14589 [3/4] Tests of ConsoleGroupCommand rewritten in java (#15365)
Is contains some of ConsoleGroupCommand tests rewritten in java.
Intention of separate PR is to reduce changes and simplify review.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16252: Fix the documentation and adjust the format (#15473)
Currently, there are few document files generated automatically like the task genConnectMetricsDocs
However, the unwanted log information also added into it.
And the format is not aligned with other which has Mbean located of the third column.
I modified the code logic so the format could follow other section in ops.html
Also close the log since we take everything from the std as a documentation
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16322 upgrade jline from 3.22.0 to 3.25.1 (#15464)
An issue in the component "GroovyEngine.execute" of jline-groovy versions through 3.24.1 allows attackers to cause an OOM (OutofMemory) error. Please refer to https://devhub.checkmarx.com/cve-details/CVE-2023-50572 for more details
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-15797: Fix flaky EOS_v2 upgrade test (#15449)
Originally, we set commit-interval to MAX_VALUE for this test,
to ensure we only commit expliclity. However, we needed to decrease it
later on when adding the tx-timeout verification.
We did see failing test for which commit-interval hit, resulting in
failing test runs. This PR increase the commit-interval close to
test-timeout to avoid commit-interval from triggering.
Reviewers: Bruno Cadonna <bruno@confluent.io>
* KAFKA-14683: Migrate WorkerSinkTaskTest to Mockito (3/3) (#15316)
Reviewers: Greg Harris <greg.harris@aiven.io>
* MINOR: Add 3.7 to Kafka Streams system tests (#15443)
Reviewers: Bruno Cadonna <bruno@confluent.io>
* KAFKA-14589 [2/4] Tests of ConsoleGroupCommand rewritten in java (#15363)
This PR is part of #14471
It contains some of ConsoleGroupCommand tests rewritten in java.
Intention of separate PR is to reduce changes and simplify review.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16246: Cleanups in ConsoleConsumer (#15457)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>
* KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 (#15261)
The previous pull request in this series was #15112.
This pull request continues the migration of the consumer mock in TaskManagerTest test by test for easier reviews.
I envision there will be at least 1 more pull request to clean things up. For example, all calls to taskManager.setMainConsumer should be removed.
Reviewer: Bruno Cadonna <cadonna@apache.org>
* KAFKA-16100: Add timeout to all the CompletableApplicationEvents (#15455)
This is part of the larger task of enforcing the timeouts for application events, per KAFKA-15974.
This takes a first step by adding a Timer to all of the CompletableApplicationEvent subclasses. For the few classes that already included a timeout, this refactors them to use the Timer mechanism instead.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Bruno Cadonna <cadonna@apache.org>
* MINOR: Add 3.7.0 to core and client's upgrade compatibility tests (#15452)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16319: Divide DeleteTopics requests by leader node (#15479)
Reviewers: Reviewers: Mickael Maison <mickael.maison@gmail.com>, Kirk True <kirk@kirktrue.pro>, Daniel Gospodinow <dgospodinov@confluent.io>
* MINOR: Add read/write all operation (#15462)
There are a few cases in the group coordinator service where we want to read from or write to each of the known coordinators (each of __consumer_offsets partitions). The current implementation needs to get the list of the known coordinators then schedules the operation and finally aggregate the results. This patch is an attempt to streamline this by adding multi read/write to the runtime.
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-15964: fix flaky StreamsAssignmentScaleTest (#15485)
This PR bumps some timeouts due to slow Jenkins builds.
Reviewers: Bruno Cadonna <bruno@confluent.io>
* MINOR: Use INFO logging for tools tests (#15487)
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16202 Extra dot in error message in producer (#15296)
The author of KAFKA-16202 noticed that there is an extra dot in the error message for KafkaStorageException message.
Looking into org.apache.kafka.clients.producer.internals.Sender, it turns out that the string for the message to be sent in completeBatch() added an extra dot. I think that the formatted component (error.exception(response.errorMessage).toString())) of the error message already has a dot in the end of its string. Thus the dot after the "{}" sign caused the extra dot.
Reviewers: "Gyeongwon, Do" <dct012@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16325 Add missing producer metrics to documentatio (#15466)
Add `buffer-exhausted-rate`, `buffer-exhausted-total`, `bufferpool-wait-ratio` and `metadata-wait-time-ns-total`
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: Reduce memory allocation in ClientTelemetryReporter (#15402)
Reviewers: Divij Vaidya <diviv@amazon.com>
* KAFKA-10892: Shared Readonly State Stores ( revisited ) (#12742)
Implements KIP-813.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Walker Carlson <wcarlson@confluent.io>
* KAFKA-14589 [4/4] Tests of ConsoleGroupCommand rewritten in java (#15465)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* TRIVIAL: fix typo
* HOTFIX: fix html markup
* MINOR: Fix incorrect syntax for config (#15500)
Fix incorrect syntax for config.
Reviewers: Matthias J. Sax <matthias@confluent.io>
* MINOR: remove the copy constructor of LogSegment (#15488)
In the LogSegment, the copy constructor is only used in LogLoaderTest
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: Cleanup log.dirs in ReplicaManagerTest on JVM exit (#15289)
- Scala TestUtils now delegates to the function in JTestUtils
- The function is modified such that we delete the rootDir on JVM exit if it didn't exist prior to the function being invoked.
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: change "inter.broker.protocol version" to inter.broker.protocol.version (#15504)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16146: Checkpoint log-start-offset for remote log enabled topics (#15201)
The log-start-offset was not getting flushed to the checkpoint file due to the check where we compare the log-start-offset with the localLog first segment base offset only. This change makes sure that tiered storage enabled topics will always try to add their entries in the log-start-offset checkpoint file.
Reviewers: Jun Rao <junrao@gmail.com>, Satish Duggana <satishd@apache.org>
* KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 3 (#15497)
The previous pull request in this series was #15261.
This pull request continues the migration of the consumer mock in TaskManagerTest test by test for easier reviews.
The next pull request in the series will be #15254 which ought to complete the Mockito migration for the TaskManagerTest class
Reviewer: Bruno Cadonna <cadonna@apache.org>
* KAFKA-16227: Avoid IllegalStateException during fetch initialization (#15491)
The AsyncKafkaConsumer might throw an IllegalStateException during
the initialization of a new fetch. The exception is caused by
the partition being unassigned by the background thread before
the subscription state is accessed during initialisation.
This commit avoids the IllegalStateException by verifying that
the partition was not unassigned each time the subscription state
is accessed.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
* MINOR: Tweak streams config doc (#15518)
Reviewers: Matthias J. Sax <matthias@confluent.io>
* MINOR: Resolve SSLContextFactory.getNeedClientAuth deprecation (#15468)
Reviewers: Mickael Maison <mickael.maison@gmail.com>
* MINOR; Make string from array (#15526)
If toString is called on an array it returns the string representing the object reference. Use mkString instead to print the content of the array.
Reviewers: Luke Chen <showuon@gmail.com>, Justine Olshan <jolshan@confluent.io>, Lingnan Liu <liliu@confluent.io>
* MINOR: simplify consumer logic (#15519)
For static member, the `group.instance.id` cannot change.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Lianet Magrans <lianetmr@gmail.com>, David Jacot <david.jacot@gmail.com>
* MINOR: Kafka Streams docs fixes (#15517)
- add missing section to TOC
- add default value for client.id
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Bruno Cadonna <bruno@confluent.io>
* KAFKA-16249; Improve reconciliation state machine (#15364)
This patch re-work the reconciliation state machine on the server side with the goal to fix a few issues that we have recently discovered.
* When a member acknowledges the revocation of partitions (by not reporting them in the heartbeat), the current implementation may miss it. The issue is that the current implementation re-compute the assignment of a member whenever there is a new target assignment installed. When it happens, it does not consider the reported owned partitions at all. As the member is supposed to only report its own partitions when they change, the member is stuck.
* Similarly, as the current assignment is re-computed whenever there is a new target assignment, the rebalance timeout, as it is currently implemented, becomes useless. The issue is that the rebalance timeout is reset whenever the member enters the revocation state. In other words, in the current implementation, the timer is reset when there are no target available even if the previous revocation is not completed yet.
The patch fixes these two issues by not automatically recomputing the assignment of a member when a new target assignment is available. When the member must revoke partitions, the coordinator waits. Otherwise, it recomputes the next assignment. In other words, revoking is really blocking now.
The patch also proposes to include an explicit state in the record. It makes the implementation cleaner and it also makes it more extensible in the future.
The patch also changes the record format. This is a non-backward compatible change. I think that we should do this change to cleanup the record. As KIP-848 is only in early access in 3.7 and that we clearly state that we don't plane to support upgrade from it, this is acceptable in my opinion.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
* KAFKA-13922: Adjustments for jacoco, coverage reporting (#11982)
Jacoco and scoverage reporting hasn't been working for a while. This commit fixes report generation. After this PR only subproject level reports are generated as Jenkins and Sonar only cares about that.
This PR doesn't change Kafka's Jenkinsfile.
Reviewers: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
* MINOR: AddPartitionsToTxnManager performance optimizations (#15454)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Justine Olshan <jolshan@confluent.io>
* KAFKA-14683 Cleanup WorkerSinkTaskTest (#15506)
1) Rename WorkerSinkTaskMockitoTest back to WorkerSinkTaskTest
2) Tidy up the code a bit
3) rewrite "fail" by "assertThrow"
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16342 fix getOffsetByMaxTimestamp for compressed records (#15474)
Fix getOffsetByMaxTimestamp for compressed records.
This PR adds:
1) For inPlaceAssignment case, compute the correct offset for maxTimestamp when traversing the batch records, and set to ValidationResult in the end, instead of setting to last offset always.
2) For not inPlaceAssignment, set the offsetOfMaxTimestamp for the log create time, like non-compressed, and inPlaceAssignment cases, instead of setting to last offset always.
3) Add tests to verify the fix.
Reviewers: Jun Rao <junrao@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test (#15523)
It is possible that due to resource constraint, ShutdownableThread#run might be called later than the ShutdownableThread#close method.
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>
* MINOR: Update javadocs and exception string in "deprecated" ProcessorRecordContext#hashcode (#15508)
This PR updates the javadocs for the "deprecated" hashCode() method of ProcessorRecordContext, as well as the UnsupportedOperationException thrown in its implementation, to actually explain why the class is mutable and therefore unsafe for use in hash collections. They now point out the mutable field in the class (namely the Headers)
Reviewers: Matthias Sax <mjsax@apache.org>, Bruno Cadonna <cadonna@apache.org>
* KAFKA-16358: Sort transformations by name in documentation; add missing transformations to documentation; add hyperlinks (#15499)
Reviewers: Yash Mayya <yash.mayya@gmail.com>
* MINOR: Only enable replay methods to modify timeline data structure (#15528)
The patch prevents the main method (the method generating records) from modifying the timeline data structure `groups` by calling `getOrMaybeCreateConsumerGroup` in kip-848 new group coordinator. Only replay methods are able to add the newly created group to `groups`.
Reviewers: David Jacot <djacot@confluent.io>
* KAFKA-16231: Update consumer_test.py to support KIP-848’s group protocol config (#15330)
Added a new optional group_protocol parameter to the test methods, then passed that down to the setup_consumer method.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* MINOR: Cleanup BoundedList to Make Constructors More Safe (#15507)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16267: Update consumer_group_command_test.py to support KIP-848’s group protocol config (#15537)
* KAFKA-16267: Update consumer_group_command_test.py to support KIP-848’s group protocol config
Added a new optional group_protocol parameter to the test methods, then passed that down to the setup_consumer method.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢
Note: this requires #15330.
* Update consumer_group_command_test.py
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-16268: Update fetch_from_follower_test.py to support KIP-848’s group protocol config (#15539)
Added a new optional `group_protocol` parameter to the test methods, then passed that down to the `setup_consumer` method.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new `@matrix` block instead of adding the `group_protocol=["classic", "consumer"]` to the existing blocks 😢
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-16269: Update reassign_partitions_test.py to support KIP-848’s group protocol config (#15540)
Added a new optional `group_protocol` parameter to the test methods, then passed that down to the `setup_consumer` method.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new `@matrix` block instead of adding the `group_protocol=["classic", "consumer"]` to the existing blocks 😢
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-16270: Update snapshot_test.py to support KIP-848’s group protocol config (#15538)
Added a new optional `group_protocol` parameter to the test methods, then passed that down to the `setup_consumer` method.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new `@matrix` block instead of adding the `group_protocol=["classic", "consumer"]` to the existing blocks 😢
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-16190: Member should send full heartbeat when rejoining (#15401)
When the consumer rejoins, heartbeat request builder make sure that all fields are sent in the heartbeat request.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* MINOR: fix flaky EosIntegrationTest (#15494)
Bumping some timeout due to slow Jenkins build.
Reviewers: Bruno Cadonna <bruno@confluent.io>
* MINOR: Remove unused client side assignor fields/classes (#15545)
In https://github.com/apache/kafka/pull/15364, we introduced, thoughtfully, a non-backward compatible record change for the new consumer group protocol. So it is a good opportunity for cleaning unused fields, mainly related to the client side assignor logic which is not implemented yet. It is better to introduce them when we need them and more importantly when we implement it.
Note that starting from 3.8, we won't make such changes anymore. Non-backward compatible changes are still acceptable now because we clearly said that upgrade won't be supported from the KIP-848 EA.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16369: Broker may not shut down when SocketServer fails to bind as Address already in use (#15530)
* KAFKA-16369: wait on enableRequestProcessingFuture
Add a Wait in in KafkaServer (ZK mode) for all the SocketServer ports
to be open, and the Acceptors to be started
The BrokerServer (KRaft mode) had such a wait,
which was missing from the KafkaServer (ZK mode).
Add unit test.
* KAFKA-16312, KAFKA-16185: Local epochs in reconciliation (#15511)
The goal of this commit is to change the following internals of the reconciliation:
- Introduce a "local epoch" to the local target assignment. When a new target is received by the server, we compare it with the current value. If it is the same, no change. Otherwise, we bump the local epoch and store the new target assignment. Then, on the reconciliation, we also store the epoch in the reconciled assignment and keep using target != current to trigger the reconciliation.
- When we are not in a group (we have not received an assignment), we use null to represent the local target assignment instead of an empty list, to avoid confusions with an empty assignment received by the server. Similarly, we use null to represent the current assignment, when we haven't reconciled the assignment yet.
We also carry the new epoch into the request builder to ensure that we report the owned partitions for the last local epoch.
- To address KAFKA-16312 (call onPartitionsAssigned on empty assignments after joining), we apply the initial assignment returned by the group coordinator (whether empty or not) as a normal reconciliation. This avoids introducing another code path to trigger rebalance listeners - reconciliation is the only way to transition to STABLE. The unneeded parts of reconciliation (autocommit, revocation) will be skipped in the existing. Since a lot of unit tests assumed that not reconciliation behavior is invoked when joining the group with an empty assignment, this required a lot of the changes in the unit tests.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, David Jacot <djacot@confluent.io>
* MINOR; Log reason for deleting a kraft snapshot (#15478)
There are three reasons why KRaft would delete a snapshot. One, it is older than the retention time. Two, the total number of bytes between the log and the snapshot excess the configuration. Three, the latest snapshot is newer than the log.
This change allows KRaft to log the exact reason why a snapshot is getting deleted.
Reviewers: David Arthur <mumrah@gmail.com>, Hailey Ni <hni@confluent.io>
* KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort state (#15524)
Now the removal of entries from the transactionsWithPendingMarkers map
checks the value and all pending marker operations keep the value along
with the operation state. This way, the pending marker operation can
only delete the state it created and wouldn't accidentally delete the
state from a different epoch (which could lead to "stuck" transactions).
Reviewers: Justine Olshan <jolshan@confluent.io>
* KAFKA-16341 fix the LogValidator for non-compressed type (#15476)
- Fix the verifying logic. If it's LOG_APPEND_TIME, we choose the offset of the first record. Else, we choose the record with the maxTimeStamp.
- rename the shallowOffsetOfMaxTimestamp to offsetOfMaxTimestamp
Reviewers: Jun Rao <junrao@gmail.com>, Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received (#15533)
This patch fixes a bug in the logic which decides when a full ConsumerGroupHeartbeat response must be returned to the client. Prior to it, the logic only relies on the `ownedTopicPartitions` field to check whether the response was a full response. This is not enough because `ownedTopicPartitions` is also set in different situations. This patch changes the logic to check `ownedTopicPartitions`, `subscribedTopicNames` and `rebalanceTimeoutMs` as they are the only three non optional fields.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
* KAFKA-12187 replace assertTrue(obj instanceof X) with assertInstanceOf (#15512)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: Update upgrade docs to refer 3.6.2 version
* KAFKA-16222: desanitize entity name when migrate client quotas (#15481)
The entity name is sanitized when it's in Zk mode.
We didn't desanitize it when we migrate client quotas. Add Sanitizer.desanitize to fix it.
Reviewers: Luke Chen <showuon@gmail.com>
* KAFKA-14589 ConsumerGroupCommand rewritten in java (#14471)
This PR contains changes to rewrite ConsumerGroupCommand in java and transfer it to tools module
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16313: Offline group protocol migration (#15546)
This patch enables an empty classic group to be automatically converted to a new consumer group and vice versa.
Reviewers: David Jacot <djacot@confluent.io>
* KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions (#15562)
Reviewers: Yash Mayya <yash.mayya@gmail.com>
* MINOR : Removed the depreciated information about Zk to Kraft migration. (#15552)
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16318 : add javafoc for kafka metric (#15483)
Add the javadoc for KafkaMetric
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16206: Fix unnecessary topic config deletion during ZK migration (#14206)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ron Dagostino <rndgstn@gmail.com>
* KAFKA-16273: Update consumer_bench_test.py to use consumer group protocol (#15548)
Adding this as part of the greater effort to modify the system tests to incorporate the use of consumer group protocol from KIP-848. Following is the test results and the tests using protocol = consumer are expected to fail:
================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.11.4
session_id: 2024-03-16--002
run time: 76 minutes 36.150 seconds
tests run: 28
passed: 25
flaky: 0
failed: 3
ignored: 0
================================================================================
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Kirk True <ktrue@confluent.io>
* MINOR: KRaft upgrade tests should only use latest stable mv (#15566)
This should help us avoid testing MVs before they are usable (stable).
We revert back from testing 3.8 in this case since 3.7 is the current stable version.
Reviewers: Proven Provenzano <pprovenzano@confluent.io>, Justine Olshan <jolshan@confluent.io>
* KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito (#15254)
This pull requests migrates the StateDirectory mock in TaskManagerTest from EasyMock to Mockito.
The change is restricted to a single mock to minimize the scope and make it easier for review.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Bruno Cadonna <cadonna@apache.org>
* KAFKA-16271: Upgrade consumer_rolling_upgrade_test.py (#15578)
Upgrading the test to use the consumer group protocol. The two tests are failing due to Mismatch Assignment
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-16274: Update replica_scale_test.py to support KIP-848’s group protocol config (#15577)
Added a new optional group_protocol parameter to the test methods, then passed that down to the methods involved.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-16276: Update transactions_test.py to support KIP-848’s group protocol config (#15567)
Added a new optional group_protocol parameter to the test methods, then passed that down to the methods involved.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-16314: Introducing the AbortableTransactionException (#15486)
As a part of KIP-890, we are introducing a new class of Exceptions which when encountered shall lead to Aborting the ongoing Transaction. The following PR introduces the same with client side handling and server side changes.
On client Side, the code attempts to handle the exception as an Abortable error and ensure that it doesn't take the producer to a fatal state. For each of the Transactional APIs, we have added the appropriate handling. For the produce request, we have verified that the exception transitions the state to Aborted.
On the server side, we have bumped the ProduceRequest, ProduceResponse, TxnOffestCommitRequest and TxnOffsetCommitResponse Version. The appropriate handling on the server side has been added to ensure that the new error case is sent back only for the new clients. The older clients will continue to get the old Invalid_txn_state exception to maintain backward compatibility.
Reviewers: Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
* KAFKA-16381 use volatile to guarantee KafkaMetric#config visibility across threads (#15550)
Reviewers: vamossagar12 <sagarmeansocean@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: Tuple2 replaced with Map.Entry (#15560)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16388 add production-ready test of 3.3 - 3.6 release to MetadataVersionTest.testFromVersionString (#15563)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16408 kafka-get-offsets / GetOffsetShell doesn't handle --version or --help (#15583)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16410 kafka-leader-election / LeaderElectionCommand doesn't set exit code on error (#15591)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16374; High watermark updates should have a higher priority (#15534)
When the group coordinator is under heavy load, the current mechanism to release pending events based on updated high watermark, which consist in pushing an event at the end of the queue, is bad because pending events pay the cost of the queue twice. A first time for the handling of the first event and a second time for the handling of the hwm update. This patch changes this logic to push the hwm update event to the front of the queue in order to release pending events as soon as as possible.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
* KAFKA-15882: Add nightly docker image scan job (#15013)
Reviewers: Mickael Maison <mickael.maison@gmail.com>
* KAFKA-16375: Fix for rejoin while reconciling (#15579)
This PR includes a fix to properly identify a reconciliation that should be interrupted and not applied because the member has rejoined. It does so simply based on a flag (not epochs, server or local). If the member has rejoined while reconciling, the reconciliation will be interrupted.
This also ensures that the check to abort the reconciliation is performed on all the 3 stages of the reconciliation that could be delayed: commit, onPartitionsRevoked, onPartitionsAssigned.
Reviewers: David Jacot <djacot@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-16406: Splitting consumer integration test (#15535)
Splitting consumer integration tests to allow for parallelization and reduce build times. This PR is only extracting tests from PlainTextConsumerTest into separate files, no changes in logic. Grouping tests by the feature they relate to so that they can be easily found
Reviewers: Andrew Schofield <aschofield@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-15950: Serialize heartbeat requests (#14903)
In between HeartbeatRequest being sent and the response being handled,
i.e. while a HeartbeatRequest is in flight, an extra request may be
immediately scheduled if propagateDirectoryFailure, setReadyToUnfence,
or beginControlledShutdown is called.
To prevent the extra request, we can avoid the extra requests by checking
whether a request is in flight, and delay the scheduling if necessary.
Some of the tests in BrokerLifecycleManagerTest are also improved to
remove race conditions and reduce flakiness.
Reviewers: Colin McCabe <colin@cmccabe.xyz>, Ron Dagostino <rdagostino@confluent.io>, Jun Rao <junrao@gmail.com>
* KAFKA-16224: Do not retry committing if topic or partition deleted (#15581)
Current logic for auto-committing offsets when partitions are revoked
will retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION,
leading to the member not completing the revocation in time.
This commit considers error UNKNOWN_TOPIC_OR_PARTITION to be fatal
in the context of an auto-commit of offsets before a revocation,
even though the error is defined as retriable. This ensures that
the revocation can finish in time.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>, Lianet Magrans <lianetmr@gmail.com>
* KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification (#15559)
KIP-890 Part 1 introduced verification of transactions with the
transaction coordinator on the `Produce` and `TxnOffsetCommit` paths.
This introduced the possibility of new errors when responding to those
requests. For backwards compatibility with older clients, a choice was
made to convert some of the new retriable errors to existing errors that
are expected and retried correctly by older clients.
`NETWORK_EXCEPTION` was forgotten about and not converted, but can occur
if, for example, the transaction coordinator is temporarily refusing
connections. Now, we convert it to:
* `NOT_ENOUGH_REPLICAS` on the `Produce` path, just like the other
retriable errors that can arise from transaction verification.
* `COORDINATOR_LOAD_IN_PROGRESS` on the `TxnOffsetCommit` path. This
error does not force coordinator lookup on clients, unlike
`COORDINATOR_NOT_AVAILABLE`. Note that this deviates from KIP-890,
which says that retriable errors should be converted to
`COORDINATOR_NOT_AVAILABLE`.
Reviewers: Artem Livshits <alivshits@confluent.io>, David Jacot <djacot@confluent.io>, Justine Olshan <jolshan@confluent.io>
* KAFKA-16409: DeleteRecordsCommand should use standard exception handling (#15586)
DeleteRecordsCommand should use standard exception handling
Reviewers: Luke Chen <showuon@gmail.com>
* KAFKA-16415 Fix handling of "--version" option in ConsumerGroupCommand (#15592)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* fix(test): fix ElasticUnifiedLog test
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
---------
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
Co-authored-by: Gaurav Narula <gaurav_narula2@apple.com>
Co-authored-by: Philip Nee <pnee@confluent.io>
Co-authored-by: David Jacot <djacot@confluent.io>
Co-authored-by: John Yu <54207775+chiacyu@users.noreply.github.com>
Co-authored-by: Christo Lolov <lolovc@amazon.com>
Co-authored-by: Ritika Reddy <98577846+rreddy-22@users.noreply.github.com>
Co-authored-by: Kirk True <kirk@kirktrue.pro>
Co-authored-by: Lucas Brutschy <lbrutschy@confluent.io>
Co-authored-by: Jamie <holmes.jc@gmail.com>
Co-authored-by: Anuj Sharma <philomath.anuj@gmail.com>
Co-authored-by: Jamie Holmes <jamie.holmes@tesco.com>
Co-authored-by: Christopher Webb <31657038+cwebbtw@users.noreply.github.com>
Co-authored-by: Said Boudjelda <bmscomp@gmail.com>
Co-authored-by: PoAn Yang <yangpoan@gmail.com>
Co-authored-by: Ayoub Omari <ayoubomari1@outlook.fr>
Co-authored-by: Ismael Juma <ismael@juma.me.uk>
Co-authored-by: Gyeongwon, Do <dct012@gmail.com>
Co-authored-by: Bruno Cadonna <cadonna@apache.org>
Co-authored-by: Greg Harris <greg.harris@aiven.io>
Co-authored-by: Nikolay <nizhikov@apache.org>
Co-authored-by: Dongnuo Lyu <139248811+dongnuo123@users.noreply.github.com>
Co-authored-by: Colin Patrick McCabe <cmccabe@apache.org>
Co-authored-by: Victor van den Hoven <victor.vanden.hoven@alliander.com>
Co-authored-by: Cheng-Kai, Zhang <kevin.zhang.tw@gmail.com>
Co-authored-by: Johnny Hsu <44309740+johnnychhsu@users.noreply.github.com>
Co-authored-by: Matthias J. Sax <matthias@confluent.io>
Co-authored-by: Hector Geraldino <hgeraldino@gmail.com>
Co-authored-by: Dmitry Werner <grimekillah@gmail.com>
Co-authored-by: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
Co-authored-by: Andrew Schofield <aschofield@confluent.io>
Co-authored-by: PoAn Yang <payang@apache.org>
Co-authored-by: Dung Ha <60119105+infantlikesprogramming@users.noreply.github.com>
Co-authored-by: Owen Leung <owen.leung2@gmail.com>
Co-authored-by: testn <test1@doramail.com>
Co-authored-by: Daan Gerits <daan.gerits@gmail.com>
Co-authored-by: Joel Hamill <11722533+joel-hamill@users.noreply.github.com>
Co-authored-by: Kamal Chandraprakash <kchandraprakash@uber.com>
Co-authored-by: Cheryl Simmons <csimmons@confluent.io>
Co-authored-by: José Armando García Sancio <jsancio@users.noreply.github.com>
Co-authored-by: Andras Katona <41361962+akatona84@users.noreply.github.com>
Co-authored-by: David Mao <47232755+splett2@users.noreply.github.com>
Co-authored-by: Luke Chen <showuon@gmail.com>
Co-authored-by: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
Co-authored-by: Chris Holland <41524756+ChrisAHolland@users.noreply.github.com>
Co-authored-by: TapDang <89607407+phong260702@users.noreply.github.com>
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Co-authored-by: Artem Livshits <84364232+artemlivshits@users.noreply.github.com>
Co-authored-by: Kuan-Po (Cooper) Tseng <brandboat@gmail.com>
Co-authored-by: Manikumar Reddy <manikumar.reddy@gmail.com>
Co-authored-by: Chris Egerton <chrise@aiven.io>
Co-authored-by: Alyssa Huang <ahuang@confluent.io>
Co-authored-by: Sanskar Jhajharia <122860866+sjhajharia@users.noreply.github.com>
Co-authored-by: Vedarth Sharma <142404391+VedarthConfluent@users.noreply.github.com>
Co-authored-by: Lianet Magrans <98415067+lianetm@users.noreply.github.com>
Co-authored-by: Igor Soarez <i@soarez.me>
Co-authored-by: Sean Quah <squah@confluent.io>
2024-03-26 21:24:16 +08:00
< li > < a class = "reference internal" href = "#readonly-state-stores" id = "id12" > Readonly State Stores< / a > < / li >
2018-03-28 05:03:24 +08:00
< li > < a class = "reference internal" href = "#implementing-custom-state-stores" id = "id7" > Implementing Custom State Stores< / a > < / li >
< / ul >
2017-12-22 03:15:54 +08:00
< / li >
< li > < a class = "reference internal" href = "#connecting-processors-and-state-stores" id = "id8" > Connecting Processors and State Stores< / a > < / li >
2018-06-05 06:39:20 +08:00
< li > < a class = "reference internal" href = "#accessing-processor-context" id = "id10" > Accessing Processor Context< / a > < / li >
2017-12-22 03:15:54 +08:00
< / ul >
< / div >
< div class = "section" id = "overview" >
< h2 > < a class = "toc-backref" href = "#id1" > Overview< / a > < a class = "headerlink" href = "#overview" title = "Permalink to this headline" > < / a > < / h2 >
< p > The Processor API can be used to implement both < strong > stateless< / strong > as well as < strong > stateful< / strong > operations, where the latter is
achieved through the use of < a class = "reference internal" href = "#streams-developer-guide-state-store" > < span class = "std std-ref" > state stores< / span > < / a > .< / p >
< div class = "admonition tip" >
2018-01-09 03:43:38 +08:00
< p > < b > Tip< / b > < / p >
2017-12-22 03:15:54 +08:00
< p class = "last" > < strong > Combining the DSL and the Processor API:< / strong >
You can combine the convenience of the DSL with the power and flexibility of the Processor API as described in the
section < a class = "reference internal" href = "dsl-api.html#streams-developer-guide-dsl-process" > < span class = "std std-ref" > Applying processors and transformers (Processor API integration)< / span > < / a > .< / p >
< / div >
2019-01-08 05:30:49 +08:00
< p > For a complete list of available API functionality, see the < a href = "/{{version}}/javadoc/org/apache/kafka/streams/package-summary.html" > Streams< / a > API docs.< / p >
2017-12-22 03:15:54 +08:00
< / div >
< div class = "section" id = "defining-a-stream-processor" >
< span id = "streams-developer-guide-stream-processor" > < / span > < h2 > < a class = "toc-backref" href = "#id2" > Defining a Stream Processor< / a > < a class = "headerlink" href = "#defining-a-stream-processor" title = "Permalink to this headline" > < / a > < / h2 >
2018-10-04 09:27:15 +08:00
< p > A < a class = "reference internal" href = "../core-concepts.html#streams_processor_node" > < span class = "std std-ref" > stream processor< / span > < / a > is a node in the processor topology that represents a single processing step.
2017-12-22 03:15:54 +08:00
With the Processor API, you can define arbitrary stream processors that processes one received record at a time, and connect
these processors with their associated state stores to compose the processor topology.< / p >
< p > You can define a customized stream processor by implementing the < code class = "docutils literal" > < span class = "pre" > Processor< / span > < / code > interface, which provides the < code class = "docutils literal" > < span class = "pre" > process()< / span > < / code > API method.
The < code class = "docutils literal" > < span class = "pre" > process()< / span > < / code > method is called on each of the received records.< / p >
< p > The < code class = "docutils literal" > < span class = "pre" > Processor< / span > < / code > interface also has an < code class = "docutils literal" > < span class = "pre" > init()< / span > < / code > method, which is called by the Kafka Streams library during task construction
phase. Processor instances should perform any required initialization in this method. The < code class = "docutils literal" > < span class = "pre" > init()< / span > < / code > method passes in a < code class = "docutils literal" > < span class = "pre" > ProcessorContext< / span > < / code >
instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition,
its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation
function (via < code class = "docutils literal" > < span class = "pre" > ProcessorContext#schedule()< / span > < / code > ), to forward a new record as a key-value pair to the downstream processors (via < code class = "docutils literal" > < span class = "pre" > ProcessorContext#forward()< / span > < / code > ),
2018-05-17 04:10:21 +08:00
and to commit the current processing progress (via < code class = "docutils literal" > < span class = "pre" > ProcessorContext#commit()< / span > < / code > ).
Any resources you set up in < code class = "docutils literal" > < span class = "pre" > init()< / span > < / code > can be cleaned up in the
< code class = "docutils literal" > < span class = "pre" > close()< / span > < / code > method. Note that Kafka Streams may re-use a single
< code class = "docutils literal" > < span class = "pre" > Processor< / span > < / code > object by calling
< code class = "docutils literal" > < span class = "pre" > init()< / span > < / code > on it again after < code class = "docutils literal" > < span class = "pre" > close()< / span > < / code > .< / p >
2021-07-13 23:23:50 +08:00
< p >
The < code class = "docutils literal" > < span class = "pre" > Processor< / span > < / code > interface takes two sets of generic parameters:
< code class = "docutils literal" > < span class = "pre" > KIn, VIn, KOut, VOut< / span > < / code > . These define the input and output types
that the processor implementation can handle. < code class = "docutils literal" > < span class = "pre" > KIn< / span > < / code > and
< code class = "docutils literal" > < span class = "pre" > VIn< / span > < / code > define the key and value types that will be passed
to < code class = "docutils literal" > < span class = "pre" > process()< / span > < / code > .
Likewise, < code class = "docutils literal" > < span class = "pre" > KOut< / span > < / code > and < code class = "docutils literal" > < span class = "pre" > VOut< / span > < / code >
define the forwarded key and value types that < code class = "docutils literal" > < span class = "pre" > ProcessorContext#forward()< / span > < / code >
will accept. If your processor does not forward any records at all (or if it only forwards
< code class = "docutils literal" > < span class = "pre" > null< / span > < / code > keys or values),
a best practice is to set the output generic type argument to
< code class = "docutils literal" > < span class = "pre" > Void< / span > < / code > .
If it needs to forward multiple types that don't share a common superclass, you will
have to set the output generic type argument to < code class = "docutils literal" > < span class = "pre" > Object< / span > < / code > .
< / p >
< p >
Both the < code class = "docutils literal" > < span class = "pre" > Processor#process()< / span > < / code >
and the < code class = "docutils literal" > < span class = "pre" > ProcessorContext#forward()< / span > < / code >
methods handle records in the form of the < code class = "docutils literal" > < span class = "pre" > Record< K, V> < / span > < / code >
data class. This class gives you access to the main components of a Kafka record:
the key, value, timestamp and headers. When forwarding records, you can use the
constructor to create a new < code class = "docutils literal" > < span class = "pre" > Record< / span > < / code >
from scratch, or you can use the convenience builder methods to replace one of the
< code class = "docutils literal" > < span class = "pre" > Record< / span > < / code > 's properties
and copy over the rest. For example,
< code class = "docutils literal" > < span class = "pre" > inputRecord.withValue(newValue)< / span > < / code >
would copy the key, timestamp, and headers from
< code class = "docutils literal" > < span class = "pre" > inputRecord< / span > < / code > while
setting the output record's value to < code class = "docutils literal" > < span class = "pre" > newValue< / span > < / code > .
Note that this does not mutate < code class = "docutils literal" > < span class = "pre" > inputRecord< / span > < / code > ,
but instead creates a shallow copy. Beware that this is only a shallow copy, so if you
plan to mutate the key, value, or headers elsewhere in the program, you will want to
create a deep copy of those fields yourself.
< / p >
< p >
In addition to handling incoming records via
< code class = "docutils literal" > < span class = "pre" > Processor#process()< / span > < / code > ,
you have the option to schedule periodic invocation (called "punctuation")
in your processor's < code class = "docutils literal" > < span class = "pre" > init()< / span > < / code >
method by calling < code class = "docutils literal" > < span class = "pre" > ProcessorContext#schedule()< / span > < / code >
and passing it a < code class = "docutils literal" > < span class = "pre" > Punctuator< / span > < / code > .
The < code class = "docutils literal" > < span class = "pre" > PunctuationType< / span > < / code > determines what notion of time is used
2018-10-04 09:27:15 +08:00
for the punctuation scheduling: either < a class = "reference internal" href = "../core-concepts.html#streams_time" > < span class = "std std-ref" > stream-time< / span > < / a > or wall-clock-time (by default, stream-time
2017-12-22 03:15:54 +08:00
is configured to represent event-time via < code class = "docutils literal" > < span class = "pre" > TimestampExtractor< / span > < / code > ). When stream-time is used, < code class = "docutils literal" > < span class = "pre" > punctuate()< / span > < / code > is triggered purely
by data because stream-time is determined (and advanced forward) by the timestamps derived from the input data. When there
is no new input data arriving, stream-time is not advanced and thus < code class = "docutils literal" > < span class = "pre" > punctuate()< / span > < / code > is not called.< / p >
< p > For example, if you schedule a < code class = "docutils literal" > < span class = "pre" > Punctuator< / span > < / code > function every 10 seconds based on < code class = "docutils literal" > < span class = "pre" > PunctuationType.STREAM_TIME< / span > < / code > and if you
process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record),
then < code class = "docutils literal" > < span class = "pre" > punctuate()< / span > < / code > would be called 6 times. This happens regardless of the time required to actually process those records. < code class = "docutils literal" > < span class = "pre" > punctuate()< / span > < / code >
would be called 6 times regardless of whether processing these 60 records takes a second, a minute, or an hour.< / p >
< p > When wall-clock-time (i.e. < code class = "docutils literal" > < span class = "pre" > PunctuationType.WALL_CLOCK_TIME< / span > < / code > ) is used, < code class = "docutils literal" > < span class = "pre" > punctuate()< / span > < / code > is triggered purely by the wall-clock time.
Reusing the example above, if the < code class = "docutils literal" > < span class = "pre" > Punctuator< / span > < / code > function is scheduled based on < code class = "docutils literal" > < span class = "pre" > PunctuationType.WALL_CLOCK_TIME< / span > < / code > , and if these
60 records were processed within 20 seconds, < code class = "docutils literal" > < span class = "pre" > punctuate()< / span > < / code > is called 2 times (one time every 10 seconds). If these 60 records
were processed within 5 seconds, then no < code class = "docutils literal" > < span class = "pre" > punctuate()< / span > < / code > is called at all. Note that you can schedule multiple < code class = "docutils literal" > < span class = "pre" > Punctuator< / span > < / code >
callbacks with different < code class = "docutils literal" > < span class = "pre" > PunctuationType< / span > < / code > types within the same processor by calling < code class = "docutils literal" > < span class = "pre" > ProcessorContext#schedule()< / span > < / code > multiple
times inside < code class = "docutils literal" > < span class = "pre" > init()< / span > < / code > method.< / p >
< div class = "admonition attention" >
2018-03-28 05:03:24 +08:00
< p class = "first admonition-title" > < b > Attention< / b > < / p >
2021-07-13 23:23:50 +08:00
< p class = "last" > Stream-time is only advanced when Streams processes records.
If there are no records to process, or if Streams is waiting for new records
due to the < a class = "reference internal" href = "/documentation/#streamsconfigs_max.task.idle.ms" > Task Idling< / a >
configuration, then the stream time will not advance and < code class = "docutils literal" > < span class = "pre" > punctuate()< / span > < / code > will not be triggered if < code class = "docutils literal" > < span class = "pre" > PunctuationType.STREAM_TIME< / span > < / code > was specified.
2017-12-22 03:15:54 +08:00
This behavior is independent of the configured timestamp extractor, i.e., using < code class = "docutils literal" > < span class = "pre" > WallclockTimestampExtractor< / span > < / code > does not enable wall-clock triggering of < code class = "docutils literal" > < span class = "pre" > punctuate()< / span > < / code > .< / p >
< / div >
2018-03-28 05:03:24 +08:00
< p > < b > Example< / b > < / p >
2017-12-22 03:15:54 +08:00
< p > The following example < code class = "docutils literal" > < span class = "pre" > Processor< / span > < / code > defines a simple word-count algorithm and the following actions are performed:< / p >
< ul class = "simple" >
< li > In the < code class = "docutils literal" > < span class = "pre" > init()< / span > < / code > method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name “ Counts” .< / li >
< li > In the < code class = "docutils literal" > < span class = "pre" > process()< / span > < / code > method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).< / li >
< li > In the < code class = "docutils literal" > < span class = "pre" > punctuate()< / span > < / code > method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.< / li >
< / ul >
2021-07-13 23:23:50 +08:00
< pre class = "line-numbers" > < code class = "language-java" > public class WordCountProcessor implements Processor< String, String, String, String> {
private KeyValueStore< String, Integer> kvStore;
2017-12-22 03:15:54 +08:00
2021-07-13 23:23:50 +08:00
@Override
public void init(final ProcessorContext< String, String> context) {
context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
try (final KeyValueIterator< String, Integer> iter = kvStore.all()) {
while (iter.hasNext()) {
final KeyValue< String, Integer> entry = iter.next();
context.forward(new Record< > (entry.key, entry.value.toString(), timestamp));
}
}
});
kvStore = context.getStateStore("Counts");
}
2017-12-22 03:15:54 +08:00
2021-07-13 23:23:50 +08:00
@Override
public void process(final Record< String, String> record) {
final String[] words = record.value().toLowerCase(Locale.getDefault()).split("\\W+");
2017-12-22 03:15:54 +08:00
2021-07-13 23:23:50 +08:00
for (final String word : words) {
final Integer oldValue = kvStore.get(word);
2017-12-22 03:15:54 +08:00
2021-07-13 23:23:50 +08:00
if (oldValue == null) {
kvStore.put(word, 1);
} else {
kvStore.put(word, oldValue + 1);
}
}
}
2017-12-22 03:15:54 +08:00
2021-07-13 23:23:50 +08:00
@Override
public void close() {
// close any resources managed by this processor
// Note: Do not close any StateStores as these are managed by the library
}
2021-05-21 23:27:42 +08:00
}< / code > < / pre >
2017-12-22 03:15:54 +08:00
< div class = "admonition note" >
2018-01-30 02:08:48 +08:00
< p > < b > Note< / b > < / p >
2017-12-22 03:15:54 +08:00
< p class = "last" > < strong > Stateful processing with state stores:< / strong >
The < code class = "docutils literal" > < span class = "pre" > WordCountProcessor< / span > < / code > defined above can access the currently received record in its < code class = "docutils literal" > < span class = "pre" > process()< / span > < / code > method, and it can
leverage < a class = "reference internal" href = "#streams-developer-guide-state-store" > < span class = "std std-ref" > state stores< / span > < / a > to maintain processing states to, for example, remember recently
arrived records for stateful processing needs like aggregations and joins. For more information, see the < a class = "reference internal" href = "#streams-developer-guide-state-store" > < span class = "std std-ref" > state stores< / span > < / a > documentation.< / p >
< / div >
< / div >
2018-03-28 05:03:24 +08:00
< div class = "section" id = "unit-testing-processors" >
< h2 >
< a class = "toc-backref" href = "#id9" > Unit Testing Processors< / a >
< a class = "headerlink" href = "#unit-testing-processors" title = "Permalink to this headline" > < / a >
< / h2 >
< p >
Kafka Streams comes with a < code > test-utils< / code > module to help you write unit tests for your
processors < a href = "testing.html#unit-testing-processors" > here< / a > .
< / p >
< / div >
2017-12-22 03:15:54 +08:00
< div class = "section" id = "state-stores" >
< span id = "streams-developer-guide-state-store" > < / span > < h2 > < a class = "toc-backref" href = "#id3" > State Stores< / a > < a class = "headerlink" href = "#state-stores" title = "Permalink to this headline" > < / a > < / h2 >
< p > To implement a < strong > stateful< / strong > < code class = "docutils literal" > < span class = "pre" > Processor< / span > < / code > or < code class = "docutils literal" > < span class = "pre" > Transformer< / span > < / code > , you must provide one or more state stores to the processor
or transformer (< em > stateless< / em > processors or transformers do not need state stores). State stores can be used to remember
recently received input records, to track rolling aggregates, to de-duplicate input records, and more.
Another feature of state stores is that they can be
< a class = "reference internal" href = "interactive-queries.html#streams-developer-guide-interactive-queries" > < span class = "std std-ref" > interactively queried< / span > < / a > from other applications, such as a
NodeJS-based dashboard or a microservice implemented in Scala or Go.< / p >
< p > The
< a class = "reference internal" href = "#streams-developer-guide-state-store-defining" > < span class = "std std-ref" > available state store types< / span > < / a > in Kafka Streams have
< a class = "reference internal" href = "#streams-developer-guide-state-store-fault-tolerance" > < span class = "std std-ref" > fault tolerance< / span > < / a > enabled by default.< / p >
< div class = "section" id = "defining-and-creating-a-state-store" >
< span id = "streams-developer-guide-state-store-defining" > < / span > < h3 > < a class = "toc-backref" href = "#id4" > Defining and creating a State Store< / a > < a class = "headerlink" href = "#defining-and-creating-a-state-store" title = "Permalink to this headline" > < / a > < / h3 >
< p > You can either use one of the available store types or
< a class = "reference internal" href = "#streams-developer-guide-state-store-custom" > < span class = "std std-ref" > implement your own custom store type< / span > < / a > .
It’ s common practice to leverage an existing store type via the < code class = "docutils literal" > < span class = "pre" > Stores< / span > < / code > factory.< / p >
< p > Note that, when using Kafka Streams, you normally don’ t create or instantiate state stores directly in your code.
2019-11-14 23:32:35 +08:00
Rather, you define state stores indirectly by creating a so-called < code class = "docutils literal" > < span class = "pre" > StoreBuilder< / span > < / code > . This builder is used by
2017-12-22 03:15:54 +08:00
Kafka Streams as a factory to instantiate the actual state stores locally in application instances when and where
needed.< / p >
< p > The following store types are available out of the box.< / p >
< table border = "1" class = "non-scrolling-table width-100-percent docutils" >
< colgroup >
< col width = "19%" / >
< col width = "11%" / >
< col width = "18%" / >
< col width = "51%" / >
< / colgroup >
< thead valign = "bottom" >
< tr class = "row-odd" > < th class = "head" > Store Type< / th >
< th class = "head" > Storage Engine< / th >
< th class = "head" > Fault-tolerant?< / th >
< th class = "head" > Description< / th >
< / tr >
< / thead >
< tbody valign = "top" >
< tr class = "row-even" > < td > Persistent
< code class = "docutils literal" > < span class = "pre" > KeyValueStore< K,< / span > < span class = "pre" > V> < / span > < / code > < / td >
< td > RocksDB< / td >
< td > Yes (enabled by default)< / td >
< td > < ul class = "first simple" >
< li > < strong > The recommended store type for most use cases.< / strong > < / li >
< li > Stores its data on local disk.< / li >
< li > Storage capacity:
managed local state can be larger than the memory (heap space) of an
application instance, but must fit into the available local disk
space.< / li >
< li > RocksDB settings can be fine-tuned, see
< a class = "reference internal" href = "config-streams.html#streams-developer-guide-rocksdb-config" > < span class = "std std-ref" > RocksDB configuration< / span > < / a > .< / li >
2023-04-13 02:31:27 +08:00
< li > Available < a class = "reference external" href = "/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore(java.lang.String)" > store variants< / a > :
timestamped key-value store, versioned key-value store, time window key-value store, session window key-value store.< / li >
< li > Use < a class = "reference external" href = "/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore(java.lang.String)" > persistentTimestampedKeyValueStore< / a >
when you need a persistent key-(value/timestamp) store that supports put/get/delete and range queries.< / li >
< li > Use < a class = "reference external" href = "/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore(java.lang.String,java.time.Duration)" > persistentVersionedKeyValueStore< / a >
when you need a persistent, versioned key-(value/timestamp) store that supports put/get/delete and timestamped get operations.< / li >
< li > Use < a class = "reference external" href = "/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore(java.lang.String,java.time.Duration,java.time.Duration,boolean)" > persistentWindowStore< / a >
or < a class = "reference external" href = "/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore(java.lang.String,java.time.Duration,java.time.Duration,boolean)" > persistentTimestampedWindowStore< / a >
when you need a persistent timeWindowedKey-value or timeWindowedKey-(value/timestamp) store, respectively.< / li >
< li > Use < a class = "reference external" href = "/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStore(java.lang.String,java.time.Duration)" > persistentSessionStore< / a >
when you need a persistent sessionWindowedKey-value store.< / li >
2017-12-22 03:15:54 +08:00
< / ul >
2021-05-21 23:27:42 +08:00
< pre class = "line-numbers" > < code class = "language-java" > // Creating a persistent key-value store:
// here, we create a `KeyValueStore< String, Long> ` named " persistent-counts" .
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
2017-12-22 03:15:54 +08:00
2021-05-21 23:27:42 +08:00
// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`.
StoreBuilder< KeyValueStore< String, Long> > countStoreSupplier =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(" persistent-counts" ),
Serdes.String(),
Serdes.Long());
KeyValueStore< String, Long> countStore = countStoreSupplier.build();< / code > < / pre >
2017-12-22 03:15:54 +08:00
< / td >
< / tr >
< tr class = "row-odd" > < td > In-memory
< code class = "docutils literal" > < span class = "pre" > KeyValueStore< K,< / span > < span class = "pre" > V> < / span > < / code > < / td >
< td > -< / td >
< td > Yes (enabled by default)< / td >
< td > < ul class = "first simple" >
< li > Stores its data in memory.< / li >
< li > Storage capacity:
managed local state must fit into memory (heap space) of an
application instance.< / li >
< li > Useful when application instances run in an environment where local
disk space is either not available or local disk space is wiped
in-between app instance restarts.< / li >
2019-02-21 11:09:50 +08:00
< li > Available < a class = "reference external" href = "/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#inMemoryKeyValueStore-java.lang.String-" > store variants< / a > :
2019-04-27 04:22:36 +08:00
time window key-value store, session window key-value store.< / li >
2020-07-07 09:09:24 +08:00
< li > Use < a class = "reference external" href = "/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStore.html" > TimestampedKeyValueStore< / a >
when you need a key-(value/timestamp) store that supports put/get/delete and range queries.< / li >
< li > Use < a class = "reference external" href = "/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html" > TimestampedWindowStore< / a >
when you need to store windowedKey-(value/timestamp) pairs.< / li >
2023-04-13 02:31:27 +08:00
< li > There is no built-in in-memory, versioned key-value store at this time.< / li >
2017-12-22 03:15:54 +08:00
< / ul >
2021-05-21 23:27:42 +08:00
< pre class = "line-numbers" > < code class = "language-java" > // Creating an in-memory key-value store:
// here, we create a `KeyValueStore< String, Long> ` named " inmemory-counts" .
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
2017-12-22 03:15:54 +08:00
2021-05-21 23:27:42 +08:00
// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`.
StoreBuilder< KeyValueStore< String, Long> > countStoreSupplier =
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(" inmemory-counts" ),
Serdes.String(),
Serdes.Long());
KeyValueStore< String, Long> countStore = countStoreSupplier.build();< / code > < / pre >
2017-12-22 03:15:54 +08:00
< / td >
< / tr >
< / tbody >
< / table >
< / div >
< div class = "section" id = "fault-tolerant-state-stores" >
< span id = "streams-developer-guide-state-store-fault-tolerance" > < / span > < h3 > < a class = "toc-backref" href = "#id5" > Fault-tolerant State Stores< / a > < a class = "headerlink" href = "#fault-tolerant-state-stores" title = "Permalink to this headline" > < / a > < / h3 >
< p > To make state stores fault-tolerant and to allow for state store migration without data loss, a state store can be
continuously backed up to a Kafka topic behind the scenes. For example, to migrate a stateful stream task from one
machine to another when < a class = "reference internal" href = "running-app.html#streams-developer-guide-execution-scaling" > < span class = "std std-ref" > elastically adding or removing capacity from your application< / span > < / a > .
This topic is sometimes referred to as the state store’ s associated < em > changelog topic< / em > , or its < em > changelog< / em > . For example, if
you experience machine failure, the state store and the application’ s state can be fully restored from its changelog. You can
< a class = "reference internal" href = "#streams-developer-guide-state-store-enable-disable-fault-tolerance" > < span class = "std std-ref" > enable or disable this backup feature< / span > < / a > for a
state store.< / p >
2019-10-15 05:41:57 +08:00
< p > Fault-tolerant state stores are backed by a
2017-12-22 03:15:54 +08:00
< a class = "reference external" href = "https://kafka.apache.org/documentation.html#compaction" > compacted< / a > changelog topic. The purpose of compacting this
topic is to prevent the topic from growing indefinitely, to reduce the storage consumed in the associated Kafka cluster,
and to minimize recovery time if a state store needs to be restored from its changelog topic.< / p >
2019-10-15 05:41:57 +08:00
< p > Fault-tolerant windowed state stores are backed by a topic that uses both compaction and
2017-12-22 03:15:54 +08:00
deletion. Because of the structure of the message keys that are being sent to the changelog topics, this combination of
deletion and compaction is required for the changelog topics of window stores. For window stores, the message keys are
composite keys that include the “ normal” key and window timestamps. For these types of composite keys it would not
be sufficient to only enable compaction to prevent a changelog topic from growing out of bounds. With deletion
enabled, old windows that have expired will be cleaned up by Kafka’ s log cleaner as the log segments expire. The
default retention setting is < code class = "docutils literal" > < span class = "pre" > Windows#maintainMs()< / span > < / code > + 1 day. You can override this setting by specifying
< code class = "docutils literal" > < span class = "pre" > StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG< / span > < / code > in the < code class = "docutils literal" > < span class = "pre" > StreamsConfig< / span > < / code > .< / p >
< p > When you open an < code class = "docutils literal" > < span class = "pre" > Iterator< / span > < / code > from a state store you must call < code class = "docutils literal" > < span class = "pre" > close()< / span > < / code > on the iterator when you are done working with
it to reclaim resources; or you can use the iterator from within a try-with-resources statement. If you do not close an iterator,
you may encounter an OOM error.< / p >
< / div >
< div class = "section" id = "enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" >
< span id = "streams-developer-guide-state-store-enable-disable-fault-tolerance" > < / span > < h3 > < a class = "toc-backref" href = "#id6" > Enable or Disable Fault Tolerance of State Stores (Store Changelogs)< / a > < a class = "headerlink" href = "#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" title = "Permalink to this headline" > < / a > < / h3 >
< p > You can enable or disable fault tolerance for a state store by enabling or disabling the change logging
of the store through < code class = "docutils literal" > < span class = "pre" > enableLogging()< / span > < / code > and < code class = "docutils literal" > < span class = "pre" > disableLogging()< / span > < / code > .
2019-06-26 05:46:44 +08:00
You can also fine-tune the associated topic’ s configuration if needed.< / p >
2017-12-22 03:15:54 +08:00
< p > Example for disabling fault-tolerance:< / p >
2021-05-21 23:27:42 +08:00
< pre class = "line-numbers" > < code class = "language-java" > import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
2017-12-22 03:15:54 +08:00
2021-05-21 23:27:42 +08:00
StoreBuilder< KeyValueStore< String, Long> > countStoreSupplier = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(" Counts" ),
Serdes.String(),
Serdes.Long())
.withLoggingDisabled(); // disable backing up the store to a changelog topic< / code > < / pre >
2017-12-22 03:15:54 +08:00
< div class = "admonition attention" >
< p class = "first admonition-title" > Attention< / p >
< p class = "last" > If the changelog is disabled then the attached state store is no longer fault tolerant and it can’ t have any < a class = "reference internal" href = "config-streams.html#streams-developer-guide-standby-replicas" > < span class = "std std-ref" > standby replicas< / span > < / a > .< / p >
< / div >
< p > Here is an example for enabling fault tolerance, with additional changelog-topic configuration:
2019-06-26 05:46:44 +08:00
You can add any log config from < a class = "reference external" href = "https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogConfig.scala" > kafka.log.LogConfig< / a > .
2017-12-22 03:15:54 +08:00
Unrecognized configs will be ignored.< / p >
2021-05-21 23:27:42 +08:00
< pre class = "line-numbers" > < code class = "language-java" > import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
2017-12-22 03:15:54 +08:00
2021-05-21 23:27:42 +08:00
Map< String, String> changelogConfig = new HashMap();
// override min.insync.replicas
changelogConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, " 1" )
2017-12-22 03:15:54 +08:00
2021-05-21 23:27:42 +08:00
StoreBuilder< KeyValueStore< String, Long> > countStoreSupplier = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(" Counts" ),
Serdes.String(),
Serdes.Long())
2022-05-31 06:57:46 +08:00
.withLoggingEnabled(changelogConfig); // enable changelogging, with custom changelog settings< / code > < / pre >
2017-12-22 03:15:54 +08:00
< / div >
2020-07-07 09:09:24 +08:00
< div class = "section" id = "timestamped-state-stores" >
< span id = "streams-developer-guide-state-store-timestamps" > < / span > < h3 > < a class = "toc-backref" href = "#id11" > Timestamped State Stores< / a > < a class = "headerlink" href = "#timestamped-state-stores" title = "Permalink to this headline" > < / a > < / h3 >
< p > KTables always store timestamps by default.
A timestamped state store improves stream processing semantics and enables
handling out-of-order data in source KTables, detecting out-of-order joins and aggregations,
and getting the timestamp of the latest update in an Interactive Query.< / p >
< p > You can query timestamped state stores both with and without a timestamp.< / p >
< b > Upgrade note:< / b > All users upgrade with a single rolling bounce per instance.
< ul class = "first simple" >
< li > For Processor API users, nothing changes in existing applications, and you
have the option of using the timestamped stores.< / li >
< li > For DSL operators, store data is upgraded lazily in the background.< / li >
< li > No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you can opt-in
by implementing the < a class = "reference external" href = "/{{version}}/javadoc/org/apache/kafka/streams/state/TimestampedBytesStore.html" > TimestampedBytesStore< / a >
interface. In this case, the old format is retained, and Streams uses a proxy store
that removes/adds timestamps on read/write.< / li >
< / ul >
< / p >
< / div >
2023-04-13 02:31:27 +08:00
< div class = "section" id = "versioned-state-stores" >
< span id = "streams-developer-guide-state-store-versioned" > < / span > < h3 > < a class = "toc-backref" href = "#id12" > Versioned Key-Value State Stores< / a > < a class = "headerlink" href = "#versioned-state-stores" title = "Permalink to this headline" > < / a > < / h3 >
< p > Versioned key-value state stores are available since Kafka Streams 3.5.
Rather than storing a single record version (value and timestamp) per key,
versioned state stores may store multiple record versions per key. This
allows versioned state stores to support timestamped retrieval operations
to return the latest record (per key) as of a specified timestamp.< / p >
< p > You can create a persistent, versioned state store by passing a
< a class = "reference external" href = "/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore(java.lang.String,java.time.Duration)" > VersionedBytesStoreSupplier< / a >
to the
< a class = "reference external" href = "/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#versionedKeyValueStoreBuilder(java.lang.String,java.time.Duration)" > versionedKeyValueStoreBuilder< / a > ,
or by implementing your own
< a class = "reference external" href = "/{{version}}/javadoc/org/apache/kafka/streams/state/VersionedKeyValueStore.html" > VersionedKeyValueStore< / a > .< / p >
< p > Each versioned store has an associated, fixed-duration < em > history retention< / em >
parameter which specifies long old record versions should be kept for.
In particular, a versioned store guarantees to return accurate results for
timestamped retrieval operations where the timestamp being queried is within
history retention of the current observed stream time.< / p >
< p > History retention also doubles as its < em > grace period< / em > , which determines
how far back in time out-of-order writes to the store will be accepted. A
versioned store will not accept writes (inserts, updates, or deletions) if
the timestamp associated with the write is older than the current observed
stream time by more than the grace period. Stream time in this context is
tracked per-partition, rather than per-key, which means it's important
that grace period (i.e., history retention) be set high enough to
accommodate a record with one key arriving out-of-order relative to a
record for another key.< / p >
< p > Because the memory footprint of versioned key-value stores is higher than
that of non-versioned key-value stores, you may want to adjust your
< a class = "reference internal" href = "memory-mgmt.html#streams-developer-guide-memory-management-rocksdb" > < span class = "std std-ref" > RocksDB memory settings< / span > < / a >
accordingly. Benchmarking your application with versioned stores is also
advised as performance is expected to be worse than when using non-versioned
stores.< / p >
< p > Versioned stores do not support caching or interactive queries at this time.
2023-04-26 10:39:23 +08:00
Also, window stores and global tables may not be versioned.< / p >
2023-04-13 02:31:27 +08:00
< b > Upgrade note:< / b > Versioned state stores are opt-in only; no automatic
upgrades from non-versioned to versioned stores will take place.
< p > Upgrades are supported from persistent, non-versioned key-value stores
to persistent, versioned key-value stores as long as the original store
has the same changelog topic format as the versioned store being upgraded
to. Both persistent
< a class = "reference external" href = "/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore(java.lang.String)" > key-value stores< / a >
and < a class = "reference external" href = "/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore(java.lang.String)" > timestamped key-value stores< / a >
share the same changelog topic format as
< a class = "reference external" href = "/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore(java.lang.String,java.time.Duration)" > persistent versioned key-value stores< / a > ,
and therefore both are eligible for upgrades.< / p >
< p > If you wish to upgrade an application using persistent, non-versioned
key-value stores to use persistent, versioned key-value stores
instead, you can perform the following procedure:< / p >
< ul class = "first simple" >
< li > Stop all application instances, and
< a class = "reference internal" href = "app-reset-tool.html#streams-developer-guide-reset-local-environment" > < span class = "std std-ref" > clear any local state directories< / span > < / a >
for the store(s) being upgraded.< / li >
< li > Update your application code to use versioned stores where desired.< / li >
< li > Update your changelog topic configs, for the relevant state stores,
to set the value of < code class = "docutils literal" > < span class = "pre" > min.compaction.lag.ms< / span > < / code >
to be at least your desired history retention. History retention plus
one day is recommended as buffer for the use of broker wall clock time
during compaction.< / li >
< li > Restart your application instances and allow time for the versioned
stores to rebuild state from changelog.< / li >
< / ul >
< / p >
feat: merge apache kafka trunk (#1030)
* KAFKA-16305: Avoid optimisation in handshakeUnwrap (#15434)
Performs additional unwrap during handshake after data from client is processed to support openssl, which needs the extra unwrap to complete handshake.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>
* KAFKA-16116: Rebalance Metrics for AsyncKafkaConsumer (#15339)
Adding the following rebalance metrics to the consumer:
rebalance-latency-avg
rebalance-latency-max
rebalance-latency-total
rebalance-rate-per-hour
rebalance-total
failed-rebalance-rate-per-hour
failed-rebalance-total
Due to the difference in protocol, we need to redefine when rebalance starts and ends.
Start of Rebalance:
Current: Right before sending out JoinGroup
ConsumerGroup: When the client receives assignments from the HB
End of Rebalance - Successful Case:
Current: Receiving SyncGroup request after transitioning to "COMPLETING_REBALANCE"
ConsumerGroup: After completing reconciliation and right before sending out "Ack" heartbeat
End of Rebalance - Failed Case:
Current: Any failure in the JoinGroup/SyncGroup response
ConsumerGroup: Failure in the heartbeat
Note: Afterall, we try to be consistent with the current protocol. Rebalances start and end with sending and receiving network requests. Failures in network requests signify the user failures in rebalance. And it is entirely possible to have multiple failures before having a successful one.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* MINOR: Optimize EventAccumulator (#15430)
`poll(long timeout, TimeUnit unit)` is either used with `Long.MAX_VALUE` or `0`. This patch replaces it with `poll` and `take`. It removes the `awaitNanos` usage.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
* MINOR: Remove the space between two words (#15439)
Remove the space between two words
Reviewers: Luke Chen <showuon@gmail.com>
* KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP (#15213)
This is the first part of the implementation of KIP-1005
The purpose of this pull request is for the broker to start returning the correct offset when it receives a -5 as a timestamp in a ListOffsets API request
Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org>
* KAFKA-15462: Add Group Type Filter for List Group to the Admin Client (#15150)
In KIP-848, we introduce the notion of Group Types based on the protocol type that the members in the consumer group use. As of now we support two types of groups:
* Classic : Members use the classic consumer group protocol ( existing one )
* Consumer : Members use the consumer group protocol introduced in KIP-848.
Currently List Groups allows users to list all the consumer groups available. KIP-518 introduced filtering the consumer groups by the state that they are in. We now want to allow users to filter consumer groups by type.
This patch includes the changes to the admin client and related files. It also includes changes to parameterize the tests to include permutations of the old GC and the new GC with the different protocol types.
Reviewers: David Jacot <djacot@confluent.io>
* KAFKA-16191: Clean up of consumer client internal events (#15438)
There are a few minor issues with the event sub-classes in the
org.apache.kafka.clients.consumer.internals.events package that should be cleaned up:
- Update the names of subclasses to remove "Application" or "Background"
- Make toString() final in the base classes and clean up the implementations of toStringBase()
- Fix minor whitespace inconsistencies
- Make variable/method names consistent
Reviewer: Bruno Cadonna <cadonna@apache.org>
* MINOR: Fix UpdatedImage and HighWatermarkUpdated events' logs (#15432)
I have noticed the following log when a __consumer_offsets partition immigrate from a broker. It appends because the event is queued up after the event that unloads the state machine. This patch fixes it and fixes another similar one.
```
[2024-02-06 17:14:51,359] ERROR [GroupCoordinator id=1] Execution of UpdateImage(tp=__consumer_offsets-28, offset=13251) failed due to This is not the correct coordinator.. (org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime)
org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.
```
Reviewers: Justine Olshan <jolshan@confluent.io>
* KAFKA-16167: Disable wakeups during autocommit on close (#15445)
When the consumer is closed, we perform a sychronous autocommit. We don't want to be woken up here, because we are already executing a close operation under a deadline. This is in line with the behavior of the old consumer.
This fixes PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup which is flaky on trunk - because we return immediately from the synchronous commit with a WakeupException, which causes us to not wait for the commit to finish and thereby sometimes miss the committed offset when a new consumer is created.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <cadonna@apache.org>
* KAFKA-16261: updateSubscription fails if already empty subscription (#15440)
The internal SubscriptionState object keeps track of whether the assignment is user-assigned, or auto-assigned. If there are no assigned partitions, the assignment resets to NONE. If you call SubscriptionState.assignFromSubscribed in this state it fails.
This change makes sure to check SubscriptionState.hasAutoAssignedPartitions() so that assignFromSubscribed is going to be permitted.
Also, a minor refactoring to make clearing the subscription a bit easier to follow in MembershipManagerImpl.
Testing via new unit test.
Reviewers: Bruno Cadonna <cadonna@apache.org>, Andrew Schofield <aschofield@confluent.io>
* KAFKA-15878: KIP-768 - Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER (#14818)
# Overview
* This change pertains to [SASL/OAUTHBEARER ](https://kafka.apache.org/documentation/#security_sasl_oauthbearer) mechanism of Kafka authentication.
* Kafka clients can use [SASL/OAUTHBEARER ](https://kafka.apache.org/documentation/#security_sasl_oauthbearer) mechanism by overriding the [custom call back handlers](https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod) .
* [KIP-768](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575) available from v3.1 further extends the mechanism with a production grade implementation.
* Kafka's [SASL/OAUTHBEARER ](https://kafka.apache.org/documentation/#security_sasl_oauthbearer) mechanism currently **rejects the non-JWT (i.e. opaque) tokens**. This is because of a more restrictive set of characters than what [RFC-6750](https://datatracker.ietf.org/doc/html/rfc6750#section-2.1) recommends.
* This JIRA can be considered an extension of [KIP-768](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575) to support the opaque tokens as well apart from the JWT tokens.
# Solution
* Have updated the regex in the the offending class to be compliant with the [RFC-6750](https://datatracker.ietf.org/doc/html/rfc6750#section-2.1)
* Have provided a supporting test case that includes the possible character set defined in [RFC-6750](https://datatracker.ietf.org/doc/html/rfc6750#section-2.1)
---------
Co-authored-by: Anuj Sharma <philomath.anuj@gmail.com>
Co-authored-by: Jamie Holmes <jamie.holmes@tesco.com>
Co-authored-by: Christopher Webb <31657038+cwebbtw@users.noreply.github.com>
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Kirk True <ktrue@confluent.io>
* MINOR: Upgrade jqwik to version 1.8.3 (#14365)
This minor pull request consist of upgrading version of jqwik library to version 1.8.0 that brings some bug fixing and some enhancements, upgrading the version now will make future upgrades easier
For breaking changes:
We are not using ArbitraryConfiguratorBase, so there is no overriding of configure method
We are not using TypeUsage.canBeAssignedTo(TypeUsage)
No breaking is related to @Provide and @ForAll usage no Exception CannotFindArbitraryException is thrown during tests running
No usage of StringArbitrary.repeatChars(0.0)
We are not affected by the removal of method TypeArbitrary.use(Executable)
We are not affected by the removal or methods ActionChainArbitrary.addAction(action) and ActionChainArbitrary.addAction(weight, action)
For more details check the release notes: https://jqwik.net/release-notes.html#180
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Yash Mayya <yash.mayya@gmail.com>
* MINOR: fix link for ListTransactionsOptions#filterOnDuration (#15459)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: fix SessionStore java doc (#15412)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: Remove unnecessary easymock/powermock dependencies (#15460)
These projects don't actually use easymock/powermock.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-15625: Do not flush global state store at each commit (#15361)
Global state stores are currently flushed at each commit, which may impact performance, especially for EOS (commit each 200ms).
The goal of this improvement is to flush global state stores only when the delta between the current offset and the last checkpointed offset exceeds a threshold.
This is the same logic we apply on local state store, with a threshold of 10000 records.
The implementation only flushes if the time interval elapsed and the threshold of 10000 records is exceeded.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Bruno Cadonna <cadonna@apache.org>
* MINOR: Updating comments to match the code (#15388)
This comment was added by #12862
The method with the comment was originally named updateLastSend, but its name was later changed to onSendAttempt.
This method doesn't increment numAttempts.
It seems that the numAttempts is only modified after a Request succeeds or fails.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16285: Make group metadata available when a new assignment is set (#15426)
Currently, in the async Kafka consumer updates to the group metadata
that are received by the heartbeat are propagated to the application thread
in form of an event. Group metadata is updated when a new assignment is
received. The new assignment is directly set in the subscription without
sending an update event from the background thread to the application thread.
That means that there might be a delay between the application thread being
aware of the update to the assignment and the application thread being
aware of the update to the group metadata. This delay can cause stale
group metadata returned by the application thread that then causes
issues when data of the new assignment is committed. A concrete
example is
producer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata)
The offsets to commit might already stem from the new assignment
but the group metadata might relate to the previous assignment.
Reviewers: Kirk True <ktrue@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
* MINOR: simplify ensure topic exists condition (#15458)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-14747: record discarded FK join subscription responses (#15395)
A foreign-key-join might drop a "subscription response" message, if the value-hash changed.
This PR adds support to record such event via the existing "dropped records" sensor.
Reviewers: Matthias J. Sax <matthias@confluent.io>
* KAFKA-16288, KAFKA-16289: Fix Values convertToDecimal exception and parseString corruption (#15399)
* KAFKA-16288: Prevent ClassCastExceptions for strings in Values.convertToDecimal
* KAFKA-16289: Values inferred schemas for map and arrays should ignore element order
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Reviewers: Chris Egerton <chrise@aiven.io>
* KAFKA-16169: FencedException in commitAsync not propagated without callback (#15437)
The javadocs for commitAsync() (w/o callback) say:
@throws org.apache.kafka.common.errors.FencedInstanceIdException
if this consumer instance gets fenced by broker.
If no callback is passed into commitAsync(), no offset commit callback invocation is submitted. However, we only check for a FencedInstanceIdException when we execute a callback. When the consumer gets fenced by another consumer with the same group.instance.id, and we do not use a callback, we miss the exception.
This change modifies the behavior to propagate the FencedInstanceIdException even if no callback is used. The code is kept very similar to the original consumer.
We also change the order - first try to throw the fenced exception, then execute callbacks. That is the order in the original consumer so it's safer to keep it this way.
For testing, we add a unit test that verifies that the FencedInstanceIdException is thrown in that case.
Reviewers: Philip Nee <pnee@confluent.io>, Matthias J. Sax <matthias@confluent.io>
* KAFKA-14588 Log cleaner configuration move to CleanerConfig (#15387)
In order to move ConfigCommand to tools we must move all it's dependencies which includes KafkaConfig and other core classes to java. This PR moves log cleaner configuration to CleanerConfig class of storage module.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: parameterize group-id in GroupMetadataManagerTestContext (#15467)
This pr parameterize some group ids in GroupMetadataManagerTestContext that are now constant strings.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: remove test constructor for PartitionAssignment (#15435)
Remove the test constructor for PartitionAssignment and remove the TODO.
Also add KRaftClusterTest.testCreatePartitions to get more coverage for
createPartitions.
Reviewers: David Arthur <mumrah@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: Remove controlPlaneRequestProcessor in BrokerServer (#15245)
It seems likely that BrokerServer was built upon the KafkaServer codebase.(#10113)
KafkaServer, using Zookeeper, separates controlPlane and dataPlane to implement KIP-291.
In KRaft, the roles of DataPlane and ControlPlane in KafkaServer seem to be divided into BrokerServer and ControllerServer.
It appears that the initial implementation of BrokerServer initialized and used the controlPlaneRequestProcessor, but it seems to have been removed, except for the code used in the shutdown method, through subsequent modifications.(#10931)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 (#15444)
Change the function with a better way to deal with the NULL pointer exception.
Reviewers: Luke Chen <showuon@gmail.com>
* KAFKA-15417: flip joinSpuriousLookBackTimeMs and emit non-joined items (#14426)
Kafka Streams support asymmetric join windows. Depending on the window configuration
we need to compute window close time etc differently.
This PR flips `joinSpuriousLookBackTimeMs`, because they were not correct, and
introduced the `windowsAfterIntervalMs`-field that is used to find if emitting records can be skipped.
Reviewers: Hao Li <hli@confluent.io>, Guozhang Wang <guozhang.wang.us@gmail.com>, Matthias J. Sax <matthias@confluent.io>
* KAFKA-16347: Upgrade zookeeper 3.8.3 -> 3.8.4 (#15480)
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-14589 [3/4] Tests of ConsoleGroupCommand rewritten in java (#15365)
Is contains some of ConsoleGroupCommand tests rewritten in java.
Intention of separate PR is to reduce changes and simplify review.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16252: Fix the documentation and adjust the format (#15473)
Currently, there are few document files generated automatically like the task genConnectMetricsDocs
However, the unwanted log information also added into it.
And the format is not aligned with other which has Mbean located of the third column.
I modified the code logic so the format could follow other section in ops.html
Also close the log since we take everything from the std as a documentation
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16322 upgrade jline from 3.22.0 to 3.25.1 (#15464)
An issue in the component "GroovyEngine.execute" of jline-groovy versions through 3.24.1 allows attackers to cause an OOM (OutofMemory) error. Please refer to https://devhub.checkmarx.com/cve-details/CVE-2023-50572 for more details
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-15797: Fix flaky EOS_v2 upgrade test (#15449)
Originally, we set commit-interval to MAX_VALUE for this test,
to ensure we only commit expliclity. However, we needed to decrease it
later on when adding the tx-timeout verification.
We did see failing test for which commit-interval hit, resulting in
failing test runs. This PR increase the commit-interval close to
test-timeout to avoid commit-interval from triggering.
Reviewers: Bruno Cadonna <bruno@confluent.io>
* KAFKA-14683: Migrate WorkerSinkTaskTest to Mockito (3/3) (#15316)
Reviewers: Greg Harris <greg.harris@aiven.io>
* MINOR: Add 3.7 to Kafka Streams system tests (#15443)
Reviewers: Bruno Cadonna <bruno@confluent.io>
* KAFKA-14589 [2/4] Tests of ConsoleGroupCommand rewritten in java (#15363)
This PR is part of #14471
It contains some of ConsoleGroupCommand tests rewritten in java.
Intention of separate PR is to reduce changes and simplify review.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16246: Cleanups in ConsoleConsumer (#15457)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>
* KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 (#15261)
The previous pull request in this series was #15112.
This pull request continues the migration of the consumer mock in TaskManagerTest test by test for easier reviews.
I envision there will be at least 1 more pull request to clean things up. For example, all calls to taskManager.setMainConsumer should be removed.
Reviewer: Bruno Cadonna <cadonna@apache.org>
* KAFKA-16100: Add timeout to all the CompletableApplicationEvents (#15455)
This is part of the larger task of enforcing the timeouts for application events, per KAFKA-15974.
This takes a first step by adding a Timer to all of the CompletableApplicationEvent subclasses. For the few classes that already included a timeout, this refactors them to use the Timer mechanism instead.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Bruno Cadonna <cadonna@apache.org>
* MINOR: Add 3.7.0 to core and client's upgrade compatibility tests (#15452)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16319: Divide DeleteTopics requests by leader node (#15479)
Reviewers: Reviewers: Mickael Maison <mickael.maison@gmail.com>, Kirk True <kirk@kirktrue.pro>, Daniel Gospodinow <dgospodinov@confluent.io>
* MINOR: Add read/write all operation (#15462)
There are a few cases in the group coordinator service where we want to read from or write to each of the known coordinators (each of __consumer_offsets partitions). The current implementation needs to get the list of the known coordinators then schedules the operation and finally aggregate the results. This patch is an attempt to streamline this by adding multi read/write to the runtime.
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-15964: fix flaky StreamsAssignmentScaleTest (#15485)
This PR bumps some timeouts due to slow Jenkins builds.
Reviewers: Bruno Cadonna <bruno@confluent.io>
* MINOR: Use INFO logging for tools tests (#15487)
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16202 Extra dot in error message in producer (#15296)
The author of KAFKA-16202 noticed that there is an extra dot in the error message for KafkaStorageException message.
Looking into org.apache.kafka.clients.producer.internals.Sender, it turns out that the string for the message to be sent in completeBatch() added an extra dot. I think that the formatted component (error.exception(response.errorMessage).toString())) of the error message already has a dot in the end of its string. Thus the dot after the "{}" sign caused the extra dot.
Reviewers: "Gyeongwon, Do" <dct012@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16325 Add missing producer metrics to documentatio (#15466)
Add `buffer-exhausted-rate`, `buffer-exhausted-total`, `bufferpool-wait-ratio` and `metadata-wait-time-ns-total`
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: Reduce memory allocation in ClientTelemetryReporter (#15402)
Reviewers: Divij Vaidya <diviv@amazon.com>
* KAFKA-10892: Shared Readonly State Stores ( revisited ) (#12742)
Implements KIP-813.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Walker Carlson <wcarlson@confluent.io>
* KAFKA-14589 [4/4] Tests of ConsoleGroupCommand rewritten in java (#15465)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* TRIVIAL: fix typo
* HOTFIX: fix html markup
* MINOR: Fix incorrect syntax for config (#15500)
Fix incorrect syntax for config.
Reviewers: Matthias J. Sax <matthias@confluent.io>
* MINOR: remove the copy constructor of LogSegment (#15488)
In the LogSegment, the copy constructor is only used in LogLoaderTest
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: Cleanup log.dirs in ReplicaManagerTest on JVM exit (#15289)
- Scala TestUtils now delegates to the function in JTestUtils
- The function is modified such that we delete the rootDir on JVM exit if it didn't exist prior to the function being invoked.
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: change "inter.broker.protocol version" to inter.broker.protocol.version (#15504)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16146: Checkpoint log-start-offset for remote log enabled topics (#15201)
The log-start-offset was not getting flushed to the checkpoint file due to the check where we compare the log-start-offset with the localLog first segment base offset only. This change makes sure that tiered storage enabled topics will always try to add their entries in the log-start-offset checkpoint file.
Reviewers: Jun Rao <junrao@gmail.com>, Satish Duggana <satishd@apache.org>
* KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 3 (#15497)
The previous pull request in this series was #15261.
This pull request continues the migration of the consumer mock in TaskManagerTest test by test for easier reviews.
The next pull request in the series will be #15254 which ought to complete the Mockito migration for the TaskManagerTest class
Reviewer: Bruno Cadonna <cadonna@apache.org>
* KAFKA-16227: Avoid IllegalStateException during fetch initialization (#15491)
The AsyncKafkaConsumer might throw an IllegalStateException during
the initialization of a new fetch. The exception is caused by
the partition being unassigned by the background thread before
the subscription state is accessed during initialisation.
This commit avoids the IllegalStateException by verifying that
the partition was not unassigned each time the subscription state
is accessed.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
* MINOR: Tweak streams config doc (#15518)
Reviewers: Matthias J. Sax <matthias@confluent.io>
* MINOR: Resolve SSLContextFactory.getNeedClientAuth deprecation (#15468)
Reviewers: Mickael Maison <mickael.maison@gmail.com>
* MINOR; Make string from array (#15526)
If toString is called on an array it returns the string representing the object reference. Use mkString instead to print the content of the array.
Reviewers: Luke Chen <showuon@gmail.com>, Justine Olshan <jolshan@confluent.io>, Lingnan Liu <liliu@confluent.io>
* MINOR: simplify consumer logic (#15519)
For static member, the `group.instance.id` cannot change.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Lianet Magrans <lianetmr@gmail.com>, David Jacot <david.jacot@gmail.com>
* MINOR: Kafka Streams docs fixes (#15517)
- add missing section to TOC
- add default value for client.id
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Bruno Cadonna <bruno@confluent.io>
* KAFKA-16249; Improve reconciliation state machine (#15364)
This patch re-work the reconciliation state machine on the server side with the goal to fix a few issues that we have recently discovered.
* When a member acknowledges the revocation of partitions (by not reporting them in the heartbeat), the current implementation may miss it. The issue is that the current implementation re-compute the assignment of a member whenever there is a new target assignment installed. When it happens, it does not consider the reported owned partitions at all. As the member is supposed to only report its own partitions when they change, the member is stuck.
* Similarly, as the current assignment is re-computed whenever there is a new target assignment, the rebalance timeout, as it is currently implemented, becomes useless. The issue is that the rebalance timeout is reset whenever the member enters the revocation state. In other words, in the current implementation, the timer is reset when there are no target available even if the previous revocation is not completed yet.
The patch fixes these two issues by not automatically recomputing the assignment of a member when a new target assignment is available. When the member must revoke partitions, the coordinator waits. Otherwise, it recomputes the next assignment. In other words, revoking is really blocking now.
The patch also proposes to include an explicit state in the record. It makes the implementation cleaner and it also makes it more extensible in the future.
The patch also changes the record format. This is a non-backward compatible change. I think that we should do this change to cleanup the record. As KIP-848 is only in early access in 3.7 and that we clearly state that we don't plane to support upgrade from it, this is acceptable in my opinion.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
* KAFKA-13922: Adjustments for jacoco, coverage reporting (#11982)
Jacoco and scoverage reporting hasn't been working for a while. This commit fixes report generation. After this PR only subproject level reports are generated as Jenkins and Sonar only cares about that.
This PR doesn't change Kafka's Jenkinsfile.
Reviewers: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
* MINOR: AddPartitionsToTxnManager performance optimizations (#15454)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Justine Olshan <jolshan@confluent.io>
* KAFKA-14683 Cleanup WorkerSinkTaskTest (#15506)
1) Rename WorkerSinkTaskMockitoTest back to WorkerSinkTaskTest
2) Tidy up the code a bit
3) rewrite "fail" by "assertThrow"
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16342 fix getOffsetByMaxTimestamp for compressed records (#15474)
Fix getOffsetByMaxTimestamp for compressed records.
This PR adds:
1) For inPlaceAssignment case, compute the correct offset for maxTimestamp when traversing the batch records, and set to ValidationResult in the end, instead of setting to last offset always.
2) For not inPlaceAssignment, set the offsetOfMaxTimestamp for the log create time, like non-compressed, and inPlaceAssignment cases, instead of setting to last offset always.
3) Add tests to verify the fix.
Reviewers: Jun Rao <junrao@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test (#15523)
It is possible that due to resource constraint, ShutdownableThread#run might be called later than the ShutdownableThread#close method.
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>
* MINOR: Update javadocs and exception string in "deprecated" ProcessorRecordContext#hashcode (#15508)
This PR updates the javadocs for the "deprecated" hashCode() method of ProcessorRecordContext, as well as the UnsupportedOperationException thrown in its implementation, to actually explain why the class is mutable and therefore unsafe for use in hash collections. They now point out the mutable field in the class (namely the Headers)
Reviewers: Matthias Sax <mjsax@apache.org>, Bruno Cadonna <cadonna@apache.org>
* KAFKA-16358: Sort transformations by name in documentation; add missing transformations to documentation; add hyperlinks (#15499)
Reviewers: Yash Mayya <yash.mayya@gmail.com>
* MINOR: Only enable replay methods to modify timeline data structure (#15528)
The patch prevents the main method (the method generating records) from modifying the timeline data structure `groups` by calling `getOrMaybeCreateConsumerGroup` in kip-848 new group coordinator. Only replay methods are able to add the newly created group to `groups`.
Reviewers: David Jacot <djacot@confluent.io>
* KAFKA-16231: Update consumer_test.py to support KIP-848’s group protocol config (#15330)
Added a new optional group_protocol parameter to the test methods, then passed that down to the setup_consumer method.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* MINOR: Cleanup BoundedList to Make Constructors More Safe (#15507)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16267: Update consumer_group_command_test.py to support KIP-848’s group protocol config (#15537)
* KAFKA-16267: Update consumer_group_command_test.py to support KIP-848’s group protocol config
Added a new optional group_protocol parameter to the test methods, then passed that down to the setup_consumer method.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢
Note: this requires #15330.
* Update consumer_group_command_test.py
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-16268: Update fetch_from_follower_test.py to support KIP-848’s group protocol config (#15539)
Added a new optional `group_protocol` parameter to the test methods, then passed that down to the `setup_consumer` method.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new `@matrix` block instead of adding the `group_protocol=["classic", "consumer"]` to the existing blocks 😢
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-16269: Update reassign_partitions_test.py to support KIP-848’s group protocol config (#15540)
Added a new optional `group_protocol` parameter to the test methods, then passed that down to the `setup_consumer` method.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new `@matrix` block instead of adding the `group_protocol=["classic", "consumer"]` to the existing blocks 😢
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-16270: Update snapshot_test.py to support KIP-848’s group protocol config (#15538)
Added a new optional `group_protocol` parameter to the test methods, then passed that down to the `setup_consumer` method.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new `@matrix` block instead of adding the `group_protocol=["classic", "consumer"]` to the existing blocks 😢
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-16190: Member should send full heartbeat when rejoining (#15401)
When the consumer rejoins, heartbeat request builder make sure that all fields are sent in the heartbeat request.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* MINOR: fix flaky EosIntegrationTest (#15494)
Bumping some timeout due to slow Jenkins build.
Reviewers: Bruno Cadonna <bruno@confluent.io>
* MINOR: Remove unused client side assignor fields/classes (#15545)
In https://github.com/apache/kafka/pull/15364, we introduced, thoughtfully, a non-backward compatible record change for the new consumer group protocol. So it is a good opportunity for cleaning unused fields, mainly related to the client side assignor logic which is not implemented yet. It is better to introduce them when we need them and more importantly when we implement it.
Note that starting from 3.8, we won't make such changes anymore. Non-backward compatible changes are still acceptable now because we clearly said that upgrade won't be supported from the KIP-848 EA.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16369: Broker may not shut down when SocketServer fails to bind as Address already in use (#15530)
* KAFKA-16369: wait on enableRequestProcessingFuture
Add a Wait in in KafkaServer (ZK mode) for all the SocketServer ports
to be open, and the Acceptors to be started
The BrokerServer (KRaft mode) had such a wait,
which was missing from the KafkaServer (ZK mode).
Add unit test.
* KAFKA-16312, KAFKA-16185: Local epochs in reconciliation (#15511)
The goal of this commit is to change the following internals of the reconciliation:
- Introduce a "local epoch" to the local target assignment. When a new target is received by the server, we compare it with the current value. If it is the same, no change. Otherwise, we bump the local epoch and store the new target assignment. Then, on the reconciliation, we also store the epoch in the reconciled assignment and keep using target != current to trigger the reconciliation.
- When we are not in a group (we have not received an assignment), we use null to represent the local target assignment instead of an empty list, to avoid confusions with an empty assignment received by the server. Similarly, we use null to represent the current assignment, when we haven't reconciled the assignment yet.
We also carry the new epoch into the request builder to ensure that we report the owned partitions for the last local epoch.
- To address KAFKA-16312 (call onPartitionsAssigned on empty assignments after joining), we apply the initial assignment returned by the group coordinator (whether empty or not) as a normal reconciliation. This avoids introducing another code path to trigger rebalance listeners - reconciliation is the only way to transition to STABLE. The unneeded parts of reconciliation (autocommit, revocation) will be skipped in the existing. Since a lot of unit tests assumed that not reconciliation behavior is invoked when joining the group with an empty assignment, this required a lot of the changes in the unit tests.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, David Jacot <djacot@confluent.io>
* MINOR; Log reason for deleting a kraft snapshot (#15478)
There are three reasons why KRaft would delete a snapshot. One, it is older than the retention time. Two, the total number of bytes between the log and the snapshot excess the configuration. Three, the latest snapshot is newer than the log.
This change allows KRaft to log the exact reason why a snapshot is getting deleted.
Reviewers: David Arthur <mumrah@gmail.com>, Hailey Ni <hni@confluent.io>
* KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort state (#15524)
Now the removal of entries from the transactionsWithPendingMarkers map
checks the value and all pending marker operations keep the value along
with the operation state. This way, the pending marker operation can
only delete the state it created and wouldn't accidentally delete the
state from a different epoch (which could lead to "stuck" transactions).
Reviewers: Justine Olshan <jolshan@confluent.io>
* KAFKA-16341 fix the LogValidator for non-compressed type (#15476)
- Fix the verifying logic. If it's LOG_APPEND_TIME, we choose the offset of the first record. Else, we choose the record with the maxTimeStamp.
- rename the shallowOffsetOfMaxTimestamp to offsetOfMaxTimestamp
Reviewers: Jun Rao <junrao@gmail.com>, Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received (#15533)
This patch fixes a bug in the logic which decides when a full ConsumerGroupHeartbeat response must be returned to the client. Prior to it, the logic only relies on the `ownedTopicPartitions` field to check whether the response was a full response. This is not enough because `ownedTopicPartitions` is also set in different situations. This patch changes the logic to check `ownedTopicPartitions`, `subscribedTopicNames` and `rebalanceTimeoutMs` as they are the only three non optional fields.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
* KAFKA-12187 replace assertTrue(obj instanceof X) with assertInstanceOf (#15512)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: Update upgrade docs to refer 3.6.2 version
* KAFKA-16222: desanitize entity name when migrate client quotas (#15481)
The entity name is sanitized when it's in Zk mode.
We didn't desanitize it when we migrate client quotas. Add Sanitizer.desanitize to fix it.
Reviewers: Luke Chen <showuon@gmail.com>
* KAFKA-14589 ConsumerGroupCommand rewritten in java (#14471)
This PR contains changes to rewrite ConsumerGroupCommand in java and transfer it to tools module
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16313: Offline group protocol migration (#15546)
This patch enables an empty classic group to be automatically converted to a new consumer group and vice versa.
Reviewers: David Jacot <djacot@confluent.io>
* KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions (#15562)
Reviewers: Yash Mayya <yash.mayya@gmail.com>
* MINOR : Removed the depreciated information about Zk to Kraft migration. (#15552)
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16318 : add javafoc for kafka metric (#15483)
Add the javadoc for KafkaMetric
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16206: Fix unnecessary topic config deletion during ZK migration (#14206)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ron Dagostino <rndgstn@gmail.com>
* KAFKA-16273: Update consumer_bench_test.py to use consumer group protocol (#15548)
Adding this as part of the greater effort to modify the system tests to incorporate the use of consumer group protocol from KIP-848. Following is the test results and the tests using protocol = consumer are expected to fail:
================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.11.4
session_id: 2024-03-16--002
run time: 76 minutes 36.150 seconds
tests run: 28
passed: 25
flaky: 0
failed: 3
ignored: 0
================================================================================
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Kirk True <ktrue@confluent.io>
* MINOR: KRaft upgrade tests should only use latest stable mv (#15566)
This should help us avoid testing MVs before they are usable (stable).
We revert back from testing 3.8 in this case since 3.7 is the current stable version.
Reviewers: Proven Provenzano <pprovenzano@confluent.io>, Justine Olshan <jolshan@confluent.io>
* KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito (#15254)
This pull requests migrates the StateDirectory mock in TaskManagerTest from EasyMock to Mockito.
The change is restricted to a single mock to minimize the scope and make it easier for review.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Bruno Cadonna <cadonna@apache.org>
* KAFKA-16271: Upgrade consumer_rolling_upgrade_test.py (#15578)
Upgrading the test to use the consumer group protocol. The two tests are failing due to Mismatch Assignment
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-16274: Update replica_scale_test.py to support KIP-848’s group protocol config (#15577)
Added a new optional group_protocol parameter to the test methods, then passed that down to the methods involved.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-16276: Update transactions_test.py to support KIP-848’s group protocol config (#15567)
Added a new optional group_protocol parameter to the test methods, then passed that down to the methods involved.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-16314: Introducing the AbortableTransactionException (#15486)
As a part of KIP-890, we are introducing a new class of Exceptions which when encountered shall lead to Aborting the ongoing Transaction. The following PR introduces the same with client side handling and server side changes.
On client Side, the code attempts to handle the exception as an Abortable error and ensure that it doesn't take the producer to a fatal state. For each of the Transactional APIs, we have added the appropriate handling. For the produce request, we have verified that the exception transitions the state to Aborted.
On the server side, we have bumped the ProduceRequest, ProduceResponse, TxnOffestCommitRequest and TxnOffsetCommitResponse Version. The appropriate handling on the server side has been added to ensure that the new error case is sent back only for the new clients. The older clients will continue to get the old Invalid_txn_state exception to maintain backward compatibility.
Reviewers: Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
* KAFKA-16381 use volatile to guarantee KafkaMetric#config visibility across threads (#15550)
Reviewers: vamossagar12 <sagarmeansocean@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* MINOR: Tuple2 replaced with Map.Entry (#15560)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16388 add production-ready test of 3.3 - 3.6 release to MetadataVersionTest.testFromVersionString (#15563)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16408 kafka-get-offsets / GetOffsetShell doesn't handle --version or --help (#15583)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16410 kafka-leader-election / LeaderElectionCommand doesn't set exit code on error (#15591)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* KAFKA-16374; High watermark updates should have a higher priority (#15534)
When the group coordinator is under heavy load, the current mechanism to release pending events based on updated high watermark, which consist in pushing an event at the end of the queue, is bad because pending events pay the cost of the queue twice. A first time for the handling of the first event and a second time for the handling of the hwm update. This patch changes this logic to push the hwm update event to the front of the queue in order to release pending events as soon as as possible.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
* KAFKA-15882: Add nightly docker image scan job (#15013)
Reviewers: Mickael Maison <mickael.maison@gmail.com>
* KAFKA-16375: Fix for rejoin while reconciling (#15579)
This PR includes a fix to properly identify a reconciliation that should be interrupted and not applied because the member has rejoined. It does so simply based on a flag (not epochs, server or local). If the member has rejoined while reconciling, the reconciliation will be interrupted.
This also ensures that the check to abort the reconciliation is performed on all the 3 stages of the reconciliation that could be delayed: commit, onPartitionsRevoked, onPartitionsAssigned.
Reviewers: David Jacot <djacot@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-16406: Splitting consumer integration test (#15535)
Splitting consumer integration tests to allow for parallelization and reduce build times. This PR is only extracting tests from PlainTextConsumerTest into separate files, no changes in logic. Grouping tests by the feature they relate to so that they can be easily found
Reviewers: Andrew Schofield <aschofield@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-15950: Serialize heartbeat requests (#14903)
In between HeartbeatRequest being sent and the response being handled,
i.e. while a HeartbeatRequest is in flight, an extra request may be
immediately scheduled if propagateDirectoryFailure, setReadyToUnfence,
or beginControlledShutdown is called.
To prevent the extra request, we can avoid the extra requests by checking
whether a request is in flight, and delay the scheduling if necessary.
Some of the tests in BrokerLifecycleManagerTest are also improved to
remove race conditions and reduce flakiness.
Reviewers: Colin McCabe <colin@cmccabe.xyz>, Ron Dagostino <rdagostino@confluent.io>, Jun Rao <junrao@gmail.com>
* KAFKA-16224: Do not retry committing if topic or partition deleted (#15581)
Current logic for auto-committing offsets when partitions are revoked
will retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION,
leading to the member not completing the revocation in time.
This commit considers error UNKNOWN_TOPIC_OR_PARTITION to be fatal
in the context of an auto-commit of offsets before a revocation,
even though the error is defined as retriable. This ensures that
the revocation can finish in time.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>, Lianet Magrans <lianetmr@gmail.com>
* KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification (#15559)
KIP-890 Part 1 introduced verification of transactions with the
transaction coordinator on the `Produce` and `TxnOffsetCommit` paths.
This introduced the possibility of new errors when responding to those
requests. For backwards compatibility with older clients, a choice was
made to convert some of the new retriable errors to existing errors that
are expected and retried correctly by older clients.
`NETWORK_EXCEPTION` was forgotten about and not converted, but can occur
if, for example, the transaction coordinator is temporarily refusing
connections. Now, we convert it to:
* `NOT_ENOUGH_REPLICAS` on the `Produce` path, just like the other
retriable errors that can arise from transaction verification.
* `COORDINATOR_LOAD_IN_PROGRESS` on the `TxnOffsetCommit` path. This
error does not force coordinator lookup on clients, unlike
`COORDINATOR_NOT_AVAILABLE`. Note that this deviates from KIP-890,
which says that retriable errors should be converted to
`COORDINATOR_NOT_AVAILABLE`.
Reviewers: Artem Livshits <alivshits@confluent.io>, David Jacot <djacot@confluent.io>, Justine Olshan <jolshan@confluent.io>
* KAFKA-16409: DeleteRecordsCommand should use standard exception handling (#15586)
DeleteRecordsCommand should use standard exception handling
Reviewers: Luke Chen <showuon@gmail.com>
* KAFKA-16415 Fix handling of "--version" option in ConsumerGroupCommand (#15592)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* fix(test): fix ElasticUnifiedLog test
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
---------
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
Co-authored-by: Gaurav Narula <gaurav_narula2@apple.com>
Co-authored-by: Philip Nee <pnee@confluent.io>
Co-authored-by: David Jacot <djacot@confluent.io>
Co-authored-by: John Yu <54207775+chiacyu@users.noreply.github.com>
Co-authored-by: Christo Lolov <lolovc@amazon.com>
Co-authored-by: Ritika Reddy <98577846+rreddy-22@users.noreply.github.com>
Co-authored-by: Kirk True <kirk@kirktrue.pro>
Co-authored-by: Lucas Brutschy <lbrutschy@confluent.io>
Co-authored-by: Jamie <holmes.jc@gmail.com>
Co-authored-by: Anuj Sharma <philomath.anuj@gmail.com>
Co-authored-by: Jamie Holmes <jamie.holmes@tesco.com>
Co-authored-by: Christopher Webb <31657038+cwebbtw@users.noreply.github.com>
Co-authored-by: Said Boudjelda <bmscomp@gmail.com>
Co-authored-by: PoAn Yang <yangpoan@gmail.com>
Co-authored-by: Ayoub Omari <ayoubomari1@outlook.fr>
Co-authored-by: Ismael Juma <ismael@juma.me.uk>
Co-authored-by: Gyeongwon, Do <dct012@gmail.com>
Co-authored-by: Bruno Cadonna <cadonna@apache.org>
Co-authored-by: Greg Harris <greg.harris@aiven.io>
Co-authored-by: Nikolay <nizhikov@apache.org>
Co-authored-by: Dongnuo Lyu <139248811+dongnuo123@users.noreply.github.com>
Co-authored-by: Colin Patrick McCabe <cmccabe@apache.org>
Co-authored-by: Victor van den Hoven <victor.vanden.hoven@alliander.com>
Co-authored-by: Cheng-Kai, Zhang <kevin.zhang.tw@gmail.com>
Co-authored-by: Johnny Hsu <44309740+johnnychhsu@users.noreply.github.com>
Co-authored-by: Matthias J. Sax <matthias@confluent.io>
Co-authored-by: Hector Geraldino <hgeraldino@gmail.com>
Co-authored-by: Dmitry Werner <grimekillah@gmail.com>
Co-authored-by: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
Co-authored-by: Andrew Schofield <aschofield@confluent.io>
Co-authored-by: PoAn Yang <payang@apache.org>
Co-authored-by: Dung Ha <60119105+infantlikesprogramming@users.noreply.github.com>
Co-authored-by: Owen Leung <owen.leung2@gmail.com>
Co-authored-by: testn <test1@doramail.com>
Co-authored-by: Daan Gerits <daan.gerits@gmail.com>
Co-authored-by: Joel Hamill <11722533+joel-hamill@users.noreply.github.com>
Co-authored-by: Kamal Chandraprakash <kchandraprakash@uber.com>
Co-authored-by: Cheryl Simmons <csimmons@confluent.io>
Co-authored-by: José Armando García Sancio <jsancio@users.noreply.github.com>
Co-authored-by: Andras Katona <41361962+akatona84@users.noreply.github.com>
Co-authored-by: David Mao <47232755+splett2@users.noreply.github.com>
Co-authored-by: Luke Chen <showuon@gmail.com>
Co-authored-by: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
Co-authored-by: Chris Holland <41524756+ChrisAHolland@users.noreply.github.com>
Co-authored-by: TapDang <89607407+phong260702@users.noreply.github.com>
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Co-authored-by: Artem Livshits <84364232+artemlivshits@users.noreply.github.com>
Co-authored-by: Kuan-Po (Cooper) Tseng <brandboat@gmail.com>
Co-authored-by: Manikumar Reddy <manikumar.reddy@gmail.com>
Co-authored-by: Chris Egerton <chrise@aiven.io>
Co-authored-by: Alyssa Huang <ahuang@confluent.io>
Co-authored-by: Sanskar Jhajharia <122860866+sjhajharia@users.noreply.github.com>
Co-authored-by: Vedarth Sharma <142404391+VedarthConfluent@users.noreply.github.com>
Co-authored-by: Lianet Magrans <98415067+lianetm@users.noreply.github.com>
Co-authored-by: Igor Soarez <i@soarez.me>
Co-authored-by: Sean Quah <squah@confluent.io>
2024-03-26 21:24:16 +08:00
< div class = "section" id = "readonly-state-stores" >
< span id = "streams-developer-guide-state-store-readonly" > < / span > < h3 > < a class = "toc-backref" href = "#id12" > ReadOnly State Stores< / a > < a class = "headerlink" href = "#readonly-state-stores" title = "Permalink to this headline" > < / a > < / h3 >
< p > A read-only state store materialized the data from its input topic. It also uses the input topic
for fault-tolerance, and thus does not have an additional changelog topic (the input topic is
re-used as changelog). Thus, the input topic should be configured with < a class = "reference external" href = "https://kafka.apache.org/documentation.html#compaction" > log compaction< / a > .
Note that no other processor should modify the content of the state store, and the only writer
should be the associated "state update processor"; other processors may read the content of the
read-only store.< / p >
< p > < b > note:< / b > beware of the partitioning requirements when using read-only state stores for lookups during
processing. You might want to make sure the original changelog topic is co-partitioned with the processors
reading the read-only statestore.< / p >
2023-04-13 02:31:27 +08:00
< / div >
2017-12-22 03:15:54 +08:00
< div class = "section" id = "implementing-custom-state-stores" >
< span id = "streams-developer-guide-state-store-custom" > < / span > < h3 > < a class = "toc-backref" href = "#id7" > Implementing Custom State Stores< / a > < a class = "headerlink" href = "#implementing-custom-state-stores" title = "Permalink to this headline" > < / a > < / h3 >
< p > You can use the < a class = "reference internal" href = "#streams-developer-guide-state-store-defining" > < span class = "std std-ref" > built-in state store types< / span > < / a > or implement your own.
The primary interface to implement for the store is
< code class = "docutils literal" > < span class = "pre" > org.apache.kafka.streams.processor.StateStore< / span > < / code > . Kafka Streams also has a few extended interfaces such
2023-04-13 02:31:27 +08:00
as < code class = "docutils literal" > < span class = "pre" > KeyValueStore< / span > < / code > and < code class = "docutils literal" > < span class = "pre" > VersionedKeyValueStore< / span > < / code > .< / p >
2018-10-04 02:20:47 +08:00
< p > Note that your customized < code class = "docutils literal" > < span class = "pre" > org.apache.kafka.streams.processor.StateStore< / span > < / code > implementation also needs to provide the logic on how to restore the state
via the < code class = "docutils literal" > < span class = "pre" > org.apache.kafka.streams.processor.StateRestoreCallback< / span > < / code > or < code class = "docutils literal" > < span class = "pre" > org.apache.kafka.streams.processor.BatchingStateRestoreCallback< / span > < / code > interface.
2019-01-08 05:30:49 +08:00
Details on how to instantiate these interfaces can be found in the < a class = "reference external" href = "/{{version}}/javadoc/org/apache/kafka/streams/processor/StateStore.html" > javadocs< / a > .< / p >
2018-02-03 03:24:20 +08:00
< p > You also need to provide a “ builder” for the store by implementing the
< code class = "docutils literal" > < span class = "pre" > org.apache.kafka.streams.state.StoreBuilder< / span > < / code > interface, which Kafka Streams uses to create instances of
2017-12-22 03:15:54 +08:00
your store.< / p >
< / div >
< / div >
2018-06-05 06:39:20 +08:00
< div class = "section" id = "accessing-processor-context" >
< h2 > < a class = "toc-backref" href = "#id10" > Accessing Processor Context< / a > < a class = "headerlink" href = "#accessing-processor-context" title = "Permalink to this headline" > < / a > < / h2 >
2018-07-18 08:31:32 +08:00
< p > As we have mentioned in the < a href = #defining-a-stream-processor > Defining a Stream Processor< / a > section, a < code > ProcessorContext< / code > control the processing workflow, such as scheduling a punctuation function, and committing the current processed state.< / p >
2019-01-08 01:21:33 +08:00
< p > This object can also be used to access the metadata related with the application like
< code class = "docutils literal" > < span class = "pre" > applicationId< / span > < / code > , < code class = "docutils literal" > < span class = "pre" > taskId< / span > < / code > ,
and < code class = "docutils literal" > < span class = "pre" > stateDir< / span > < / code > , and also record related metadata as < code class = "docutils literal" > < span class = "pre" > topic< / span > < / code > ,
< code class = "docutils literal" > < span class = "pre" > partition< / span > < / code > , < code class = "docutils literal" > < span class = "pre" > offset< / span > < / code > , < code class = "docutils literal" > < span class = "pre" > timestamp< / span > < / code > and
2018-06-05 06:39:20 +08:00
< code class = "docutils literal" > < span class = "pre" > headers< / span > < / code > .< / p >
< p > Here is an example implementation of how to add a new header to the record:< / p >
2021-05-21 23:27:42 +08:00
< pre class = "line-numbers" > < code class = "language-java" > public void process(String key, String value) {
2018-10-16 08:22:03 +08:00
2021-05-21 23:27:42 +08:00
// add a header to the elements
context().headers().add.(" key" , " value" );
}< / code > < / pre >
2017-12-22 03:15:54 +08:00
< div class = "section" id = "connecting-processors-and-state-stores" >
< h2 > < a class = "toc-backref" href = "#id8" > Connecting Processors and State Stores< / a > < a class = "headerlink" href = "#connecting-processors-and-state-stores" title = "Permalink to this headline" > < / a > < / h2 >
< p > Now that a < a class = "reference internal" href = "#streams-developer-guide-stream-processor" > < span class = "std std-ref" > processor< / span > < / a > (WordCountProcessor) and the
state stores have been defined, you can construct the processor topology by connecting these processors and state stores together by
using the < code class = "docutils literal" > < span class = "pre" > Topology< / span > < / code > instance. In addition, you can add source processors with the specified Kafka topics
to generate input data streams into the topology, and sink processors with the specified Kafka topics to generate
output data streams out of the topology.< / p >
< p > Here is an example implementation:< / p >
2021-05-21 23:27:42 +08:00
< pre class = "line-numbers" > < code class = "language-java" > Topology builder = new Topology();
// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
// add the WordCountProcessor node which takes the source processor as its upstream processor
.addProcessor("Process", () -> new WordCountProcessor(), "Source")
// add the count store associated with the WordCountProcessor processor
.addStateStore(countStoreBuilder, "Process")
// add the sink processor node that takes Kafka topic "sink-topic" as output
// and the WordCountProcessor node as its upstream processor
.addSink("Sink", "sink-topic", "Process");< / code > < / pre >
2017-12-22 03:15:54 +08:00
< p > Here is a quick explanation of this example:< / p >
< ul class = "simple" >
< li > A source processor node named < code class = "docutils literal" > < span class = "pre" > " Source" < / span > < / code > is added to the topology using the < code class = "docutils literal" > < span class = "pre" > addSource< / span > < / code > method, with one Kafka topic
< code class = "docutils literal" > < span class = "pre" > " source-topic" < / span > < / code > fed to it.< / li >
< li > A processor node named < code class = "docutils literal" > < span class = "pre" > " Process" < / span > < / code > with the pre-defined < code class = "docutils literal" > < span class = "pre" > WordCountProcessor< / span > < / code > logic is then added as the downstream
processor of the < code class = "docutils literal" > < span class = "pre" > " Source" < / span > < / code > node using the < code class = "docutils literal" > < span class = "pre" > addProcessor< / span > < / code > method.< / li >
< li > A predefined persistent key-value state store is created and associated with the < code class = "docutils literal" > < span class = "pre" > " Process" < / span > < / code > node, using
< code class = "docutils literal" > < span class = "pre" > countStoreBuilder< / span > < / code > .< / li >
< li > A sink processor node is then added to complete the topology using the < code class = "docutils literal" > < span class = "pre" > addSink< / span > < / code > method, taking the < code class = "docutils literal" > < span class = "pre" > " Process" < / span > < / code > node
2018-05-31 02:54:53 +08:00
as its upstream processor and writing to a separate < code class = "docutils literal" > < span class = "pre" > " sink-topic" < / span > < / code > Kafka topic (note that users can also use another overloaded variant of < code class = "docutils literal" > < span class = "pre" > addSink< / span > < / code >
to dynamically determine the Kafka topic to write to for each received record from the upstream processor).< / li >
2017-12-22 03:15:54 +08:00
< / ul >
2020-05-28 01:57:14 +08:00
< p > In some cases, it may be more convenient to add and connect a state store at the same time as you add the processor to the topology.
This can be done by implementing < code class = "docutils literal" > < span class = "pre" > ConnectedStoreProvider#stores()< / span > < / code > on the < code class = "docutils literal" > < span class = "pre" > ProcessorSupplier< / span > < / code >
instead of calling < code class = "docutils literal" > < span class = "pre" > Topology#addStateStore()< / span > < / code > , like this:
< / p >
2021-05-21 23:27:42 +08:00
< pre class = "line-numbers" > < code class = "language-java" > Topology builder = new Topology();
// add the source processor node that takes Kafka "source-topic" as input
builder.addSource("Source", "source-topic")
// add the WordCountProcessor node which takes the source processor as its upstream processor.
// the ProcessorSupplier provides the count store associated with the WordCountProcessor
2021-07-13 23:23:50 +08:00
.addProcessor("Process", new ProcessorSupplier< String, String, String, String> () {
public Processor< String, String, String, String> get() {
2021-05-21 23:27:42 +08:00
return new WordCountProcessor();
}
2021-07-13 23:23:50 +08:00
public Set< StoreBuilder< ?> > stores() {
final StoreBuilder< KeyValueStore< String, Long> > countsStoreBuilder =
Stores
.keyValueStoreBuilder(
Stores.persistentKeyValueStore("Counts"),
Serdes.String(),
Serdes.Long()
);
return Collections.singleton(countsStoreBuilder);
2021-05-21 23:27:42 +08:00
}
}, "Source")
// add the sink processor node that takes Kafka topic "sink-topic" as output
// and the WordCountProcessor node as its upstream processor
.addSink("Sink", "sink-topic", "Process");< / code > < / pre >
2020-05-28 01:57:14 +08:00
< p > This allows for a processor to "own" state stores, effectively encapsulating their usage from the user wiring the topology.
Multiple processors that share a state store may provide the same store with this technique, as long as the < code class = "docutils literal" > < span class = "pre" > StoreBuilder< / span > < / code > is the same < code class = "docutils literal" > < span class = "pre" > instance< / span > < / code > .< / p >
< p > In these topologies, the < code class = "docutils literal" > < span class = "pre" > " Process" < / span > < / code > stream processor node is considered a downstream processor of the < code class = "docutils literal" > < span class = "pre" > " Source" < / span > < / code > node, and an
2017-12-22 03:15:54 +08:00
upstream processor of the < code class = "docutils literal" > < span class = "pre" > " Sink" < / span > < / code > node. As a result, whenever the < code class = "docutils literal" > < span class = "pre" > " Source" < / span > < / code > node forwards a newly fetched record from
Kafka to its downstream < code class = "docutils literal" > < span class = "pre" > " Process" < / span > < / code > node, the < code class = "docutils literal" > < span class = "pre" > WordCountProcessor#process()< / span > < / code > method is triggered to process the record and
update the associated state store. Whenever < code class = "docutils literal" > < span class = "pre" > context#forward()< / span > < / code > is called in the
< code class = "docutils literal" > < span class = "pre" > WordCountProcessor#punctuate()< / span > < / code > method, the aggregate key-value pair will be sent via the < code class = "docutils literal" > < span class = "pre" > " Sink" < / span > < / code > processor node to
the Kafka topic < code class = "docutils literal" > < span class = "pre" > " sink-topic" < / span > < / code > . Note that in the < code class = "docutils literal" > < span class = "pre" > WordCountProcessor< / span > < / code > implementation, you must refer to the
same store name < code class = "docutils literal" > < span class = "pre" > " Counts" < / span > < / code > when accessing the key-value store, otherwise an exception will be thrown at runtime,
indicating that the state store cannot be found. If the state store is not associated with the processor
in the < code class = "docutils literal" > < span class = "pre" > Topology< / span > < / code > code, accessing it in the processor’ s < code class = "docutils literal" > < span class = "pre" > init()< / span > < / code > method will also throw an exception at
runtime, indicating the state store is not accessible from this processor.< / p >
2020-11-04 09:30:15 +08:00
< p > Note that the < code class = "docutils literal" > < span class = "pre" > Topology#addProcessor< / span > < / code > function takes a < code class = "docutils literal" > < span class = "pre" > ProcessorSupplier< / span > < / code > as argument, and that the supplier pattern requires that a new
< code class = "docutils literal" > < span class = "pre" > Processor< / span > < / code > instance is returned each time < code class = "docutils literal" > < span class = "pre" > ProcessorSupplier#get()< / span > < / code > is called. Creating a single < code class = "docutils literal" > < span class = "pre" > Processor< / span > < / code >
object and returning the same object reference in < code class = "docutils literal" > < span class = "pre" > ProcessorSupplier#get()< / span > < / code > would be a violation of the supplier pattern and leads to runtime exceptions.
So remember not to provide a singleton < code class = "docutils literal" > < span class = "pre" > Processor< / span > < / code > instance to < code class = "docutils literal" > < span class = "pre" > Topology< / span > < / code > . The
< code class = "docutils literal" > < span class = "pre" > ProcessorSupplier< / span > < / code > should always generate a new instance each time < code class = "docutils literal" > < span class = "pre" > ProcessorSupplier#get()< / span > < / code > gets called.< / p >
2017-12-22 03:15:54 +08:00
< p > Now that you have fully defined your processor topology in your application, you can proceed to
< a class = "reference internal" href = "running-app.html#streams-developer-guide-execution" > < span class = "std std-ref" > running the Kafka Streams application< / span > < / a > .< / p >
2018-06-05 06:39:20 +08:00
< / div >
< / div >
2017-12-22 03:15:54 +08:00
< / div >
< / div >
< div class = "pagination" >
< a href = "/{{version}}/documentation/streams/developer-guide/dsl-api" class = "pagination__btn pagination__btn__prev" > Previous< / a >
< a href = "/{{version}}/documentation/streams/developer-guide/datatypes" class = "pagination__btn pagination__btn__next" > Next< / a >
< / div >
< / script >
<!-- #include virtual="../../../includes/_header.htm" -->
<!-- #include virtual="../../../includes/_top.htm" -->
2020-11-04 21:30:10 +08:00
< div class = "content documentation " >
2017-12-22 03:15:54 +08:00
<!-- #include virtual="../../../includes/_nav.htm" -->
< div class = "right" >
2020-11-04 21:30:10 +08:00
<!-- //#include virtual="../../../includes/_docs_banner.htm" -->
2017-12-22 03:15:54 +08:00
< ul class = "breadcrumbs" >
< li > < a href = "/documentation" > Documentation< / a > < / li >
< li > < a href = "/documentation/streams" > Kafka Streams< / a > < / li >
< li > < a href = "/documentation/streams/developer-guide/" > Developer Guide< / a > < / li >
< / ul >
< div class = "p-content" > < / div >
< / div >
< / div >
<!-- #include virtual="../../../includes/_footer.htm" -->
< script >
$(function() {
// Show selected style on nav item
$('.b-nav__streams').addClass('selected');
//sticky secondary nav
var $navbar = $(".sub-nav-sticky"),
y_pos = $navbar.offset().top,
height = $navbar.height();
$(window).scroll(function() {
var scrollTop = $(window).scrollTop();
if (scrollTop > y_pos - height) {
$navbar.addClass("navbar-fixed")
} else if (scrollTop < = y_pos) {
$navbar.removeClass("navbar-fixed")
}
});
// Display docs subnav items
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
});
< / script >