This is a really long story, but the incident started in KAFKA-13419 when we observed a member sending out a topic partition owned from the previous generation when a member missed a rebalance cycle due to REBALANCE_IN_PROGRESS.
This patch changes the AbstractStickyAssignor.AllSubscriptionsEqual method. In short, it should no long check and validate only the highest generation. Instead, we consider 3 cases:
1. Member will continue to hold on to its partition if there are no other owners
2. If there are 1+ owners to the same partition. One with the highest generation will win.
3. If two members of the same generation hold on to the same partition. We will log an error but remove both from the assignment. (Same with the current logic)
Here are some important notes that lead to the patch:
- If a member is kicked out of the group, and `UNKNOWN_MEMBER_ID` will be thrown.
- It seems to be a common situation that members are late to joinGroup and therefore get `REBALANCE_IN_PROGRESS` error. This is why we don't want to reset generation because it might cause lots of revocations and can be disruptive
To summarize the current behavior of different errors:
`REBALANCE_IN_PROGRESS`
- heartbeat: requestRejoin if member state is stable
- joinGroup: rejoin immediately
- syncGroup: rejoin immediately
- commit: requestRejoin and fail the commit. Raise this exception if the generation is staled, i.e. another rebalance is already in progress.
`UNKNOWN_MEMBER_ID`
- heartbeat: resetStateAndRejoinif generation hasn't changed. otherwise, ignore
- joinGroup: resetStateAndRejoin if generation unchanged, otherwise rejoin immediately
- syncGroup: resetStateAndRejoin if generation unchanged, otherwise rejoin immediately
`ILLEGAL_GENERATION`
- heartbeat: resetStateAndRejoinif generation hasn't changed. otherwise, ignore
- syncGroup: raised the exception if generation has been resetted or the member hasn't completed rebalancing. then resetStateAndRejoin if generation unchanged, otherwise rejoin immediately
Reviewers: David Jacot <djacot@confluent.io>
1. add ZkMigrationReady in apiVersionsResponse
2. check all nodes if ZkMigrationReady are ready before moving to next migration state
Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
Don't allow setting negative or zero values for quotas. Don't allow SCRAM mechanism names to be
used as client quota names. SCRAM mechanisms are not client quotas. (The confusion arose because of
internal ZK representation details that treated them both as "client configs.")
Add unit tests for ClientQuotaControlManager.isValidIpEntity and
ClientQuotaControlManager.configKeysForEntityType.
This change doesn't affect metadata record application, only input validation. If there are bad
client quotas that are set currently, this change will not alter the current behavior (of throwing
an exception and ignoring the bad quota).
The sslEngine.setUseClientMode(false) was duplicated when ssl mode is server during SSLEngine creation
in DefaultSslEngineFactory.java. The patch attemps to remove the duplicated call.
Reviewers: maulin-vasavada <maulin.vasavada@gmail.com>, Divij Vaidya <diviv@amazon.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
We are handling complex workflows ListOffsets by chaining together MetadataCall instances and ListOffsetsCall instances, there are many complex and error-prone logic. In this PR we rewrote it with the `AdminApiDriver` infra, notable changes better than old logic:
1. Retry lookup stage on receiving `NOT_LEADER_OR_FOLLOWER` and `LEADER_NOT_AVAILABLE`, whereas in the past we failed the partition directly without retry.
2. Removing class field `supportsMaxTimestamp` and calculating it on the fly to avoid the mutable state, this won't change any behavior of the client.
3. Retry fulfillment stage on `RetriableException`, whereas in the past we just retry fulfillment stage on `InvalidMetadataException`, this means we will retry on `TimeoutException` and other `RetriableException`.
We also `handleUnsupportedVersionException` to `AdminApiHandler` and `AdminApiLookupStrategy`, they are used to keep consistency with old logic, and we can continue improvise them.
Reviewers: Ziming Deng <dengziming1993@gmail.com>, David Jacot <djacot@confluent.io>
KafkaStatusBackingStore uses an infinite retry logic on producer send, which can lead to a stack overflow.
To avoid the problem, a background thread was added, and the callback submits the retry onto the background thread.
Added check for ongoing transaction
Thread to send and receive verify only add partition to txn requests
Code to send on request thread courtesy of @artemlivshits
Reviewers: Artem Livshits <alivshits@confluent.io>, Jun Rao <junrao@gmail.com>
The SnapshotReader exposes the "last contained log time". This is mainly used during snapshot cleanup. The previous implementation used the append time of the snapshot record. This is not accurate as this is the time when the snapshot was created and not the log append time of the last record included in the snapshot.
The log append time of the last record included in the snapshot is store in the header control record of the snapshot. The header control record is the first record of the snapshot.
To be able to read this record, this change extends the RecordsIterator to decode and expose the control records in the Records type.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Jordan Moore <crikket.007@gmail.com>, Chris Egerton <fearthecellos@gmail.com>
Implement KIP-900
Update kafka-storage to be able to add SCRAM records to the bootstrap metadata file at format time so that SCRAM is enabled at initial start (bootstrap) of KRaft cluster. Includes unit tests.
Update ./core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala to use bootstrap and
enable the test to run with both ZK and KRaft quorum.
Moved the one test from ScramServerStartupTest.scala into SaslScramSslEndToEndAuthorizationTest.scala. This test is really small, so there was no point in recreating all the bootstrap startup just for a 5 line test when it could easily be run elsewhere.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Manikumar Reddy <manikumar.reddy@gmail.com>
Best-effort rack alignment for sticky assignors when both consumer racks and partition racks are available with the protocol changes introduced in KIP-881. Rack-aware assignment is enabled by configuring client.rack for consumers. The assignment builders attempt to align on racks on a best-effort basis, but prioritize balanced assignment over rack alignment.
Reviewers: David Jacot <djacot@confluent.io>
When `client.rack` is configured for consumers, we perform rack-aware consumer partition assignment to improve locality. After/during reassignments, replica racks may change, so to ensure optimal consumer assignment, trigger rebalance from the leader when set of racks of any partition changes.
Reviewers: David Jacot <djacot@confluent.io>
On startup, we always update the metadata. The topic ID also goes from null to defined. Move the epoch is null check to before the topic ID check to prevent log spam.
Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
In this PR, I implemented the committed API. Here are the specifics:
* the CommitRequestManager handles committed() request.
* I implemented a UnsentOffsetFetchRequestState to handle deduping the request: because we don't want to send the exact requests repeatedly.
* I implemented the retry mechanism: Some retriable errors will be retried automatically
* ClientResponse errors are handled in the handlers.
* Some of the top-level APIs were refactored lightly.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Fix for a NPE bug that was caused by referring to a local variable and not the instance variable of the deserializers.
Co-authored-by: Robert Yokota <1761488+rayokota@users.noreply.github.com>
Reviewers: Robert Yokota <1761488+rayokota@users.noreply.github.com>, Guozhang Wang <wangguoz@gmail.com>
* KAFKA-14365: Extract common logic from Fetcher
Extract logic from Fetcher into AbstractFetcher.
Also introduce FetchConfig as a more concise way to delineate state from
incoming configuration.
Formalized the defaults in CommonClientConfigs and ConsumerConfig to be
accessible elsewhere.
* Removed overridden methods in favor of synchronizing where needed
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This patch is the first part of KIP-903. It updates the FetchRequest to include the new tagged ReplicaState field which replaces the now deprecated ReplicaId field. The FetchRequest version is bumped to version 15 and the MetadataVersion to 3.5-IV1.
Reviewers: David Jacot <djacot@confluent.io>
This commit refactors AbstractStickyAssignor without changing any logic to make it easier to add rack-awareness. The class currently consists of a lot of collections that are passed around various methods, with some methods updating some collections. Addition of rack-awareness makes this class with very large methods even more complex and harder to read. The new code moves the two assignment methods into their own classes so that the state can be maintained as instance fields rather than local variables.
Reviewers: David Jacot <djacot@confluent.io>
The Fetcher class is used internally by the KafkaConsumer to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored Fetcher.
This task includes refactoring Fetcher by extracting out the inner classes into top-level (though still in internal) so that those classes can be referenced by forthcoming refactored fetch logic.
Reviewers: Philip Nee <philipnee@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Minor changes to `Sender` and `NetworkClient` so that we can log timeouts during `ProduceRequest` with a more precise error message, denoting a timeout vs. "generic" network error.
Reviewers: Philip Nee <pnee@confluent.io>, Guozhang Wang <guozhang@apache.org>, David Jacot <djacot@confluent.io>
A binary value (array of bytes) can be a BinaryNode or a TextNode. When it is a BinaryNode, the method binaryValue() always returns non-null. When it is a TextNode, the method binaryValue() will return non-null if the value is a base64 string. For all other JSON nodes binaryValue() returns null.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
The goal of this PR is to add more tests to the PrototypeAsyncConsumer to test
* Successful startup and shutdown.
* Commit.
I also added integration tests:
* Test commitAsync()
* Test commitSync()
Note that I still need to implement committed() to test if commitSync() has been successfully committed.
Additional things:
Change KafkaConsumer<K, V> to Consumer<K, V> to use different implementations
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Guozhang Wang <wangguoz@gmail.com>
Part 1 of KIP-890
I've updated the API spec and related classes.
Clients should only be able to send up to version 3 requests and that is enforced by using a client builder.
Requests > 4 only require cluster permissions as they are initiated from other brokers. API version 4 is marked as unstable for now.
I've added tests for the batched requests and for the verifyOnly mode.
Also -- minor change to the KafkaApis method to properly match the request name.
Reviewers: Jason Gustafson <jason@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, David Jacot <djacot@confluent.io>
To avoid mistakes during dynamic broker config updates that could potentially affect clients, we restrict changes that can be performed dynamically without broker restart. For broker keystore updates, we require the DN to be the same for the old and new certificates since this could potentially contain host names used for host name verification by clients. DNs are compared using standard Java implementation of X500Principal.equals() which compares canonical names. If tags of fields change from one with a printable string representation and one without or vice-versa, canonical name check fails even if the actual name is the same since canonical representation converts to hex for some tags only. We can relax the verification to allow dynamic updates in this case by enabling dynamic update if either the canonical name or the RFC2253 string representation of the DN matches.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Kalpesh Patel <kpatel@confluent.io>
This commit adds support to store the SCRAM credentials in a cluster with KRaft quorum servers and
no ZK cluster backing the metadata. This includes creating ScramControlManager in the controller,
and adding support for SCRAM to MetadataImage and MetadataDelta.
Change UserScramCredentialRecord to contain only a single tuple (name, mechanism, salt, pw, iter)
rather than a mapping between name and a list. This will avoid creating an excessively large record
if a single user has many entries. Because record ID 11 (UserScramCredentialRecord) has not been
used before, this is a compatible change. SCRAM will be supported in 3.5-IV0 and later.
This commit does not include KIP-900 SCRAM bootstrapping support, or updating the credential cache
on the controller (as opposed to broker). We will implement these in follow-on commits.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
throwing an exception directly form the foreground thread's callers when the abnormal exit of the heartbeat thread
Reviewers: Luke Chen <showuon@gmail.com>, Philip Nee <philipnee@gmail.com>
Best-effort rack alignment for range assignor when both consumer racks and partition racks are available with the protocol changes introduced in KIP-881. Rack-aware assignment is enabled by configuring client.rack for consumers. Balanced assignment per topic is prioritized over rack-alignment. For topics with equal partitions and the same set of subscribers, co-partitioning is prioritized over rack-alignment.
Reviewers: David Jacot <djacot@confluent.io>
In AbstractCoordinator#joinGroupIfNeeded - joinGroup request will be retried without proper backoff, due to the expired timer. This is an uncommon scenario and possibly only appears during the testing, but I think it makes sense to enforce the client to drive the join group via poll.
Reviewers: Guozhang Wang <wangguoz@gmail.com>