Allow KRaft replicas to read and write version 0 and 1 of the quorum-state file. Which version is written is controlled by the kraft.version. With kraft.version 0, version 0 of the quorum-state file is written. With kraft.version 1, version 1 of the quorum-state file is written. Version 1 of the quorum-state file adds the VotedDirectoryId field and removes the CurrentVoters. The other fields removed in version 1 are not important as they were not overwritten or used by KRaft.
In kraft.version 1 the set of voters will be stored in the kraft partition log segments and snapshots.
To implement this feature the following changes were made to KRaft.
FileBasedStateStore was renamed to FileQuorumStateStore to better match the name of the implemented interface QuorumStateStore.
The QuorumStateStore::writeElectionState was extended to include the kraft.version. This version is used to determine which version of QuorumStateData to store. When writing version 0 the VotedDirectoryId is not persisted but the latest value is kept in-memory. This allows the replica to vote consistently while they stay online. If a replica restarts in the middle of an election it will forget the VotedDirectoryId if the kraft.version is 0. This should be rare in practice and should only happen if there is an election and failure while the system is upgrading to kraft.version 1.
The type ElectionState, the interface EpochState and all of the implementations of EpochState (VotedState, UnattachedState, FollowerState, ResignedState, CandidateState and LeaderState) are extended to support the new voted directory id.
The type QuorumState is changed so that local directory id is used. The type is also changed so that the latest value for the set of voters and the kraft version is query from the KRaftControlRecordStateMachine.
The replica directory id is read from the meta.properties and passed to the KafkaRaftClient. The replica directory id is guaranteed to be set in the local replica.
Adds a new metric for current-vote-directory-id which exposes the latest in-memory value of the voted directory id.
Renames VoterSet.VoterKey to ReplicaKey.
It is important to note that after this change, version 1 of the quorum-state file will not be written by kraft controllers and brokers. This change adds support reading and writing version 1 of the file in preparation for future changes.
Reviewers: Jun Rao <junrao@apache.org>
We need to add an example test to the PlaintextConsumerTest that tests a common
ConsumerRebalanceListener use case. For example, create an integration test that
invokes the Consumer API to commit offsets in the onPartitionsRevoked callback.
This test is implemented in a reasonably general way with a view to using it as a
template from which other tests can be created later. Eventually we will need to
have a comprehensive set of tests that cover all the basic use cases.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Bruno Cadonna <cadonna@apache.org>
This patch deprecates `offsets.commit.required.acks` in Apache Kafka 3.8 as described in KIP-1041: https://cwiki.apache.org/confluence/x/9YobEg.
Reviewers: Justine Olshan <jolshan@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
KIP-227 introduced in-memory caching of FetchSessions. Brokers with a large number of Fetch requests suffer from contention on trying to acquire a lock on FetchSessionCache.
This change aims to reduce lock contention for FetchSessionCache by sharding the cache into multiple segments, each responsible for an equal range of sessionIds. Assuming Fetch requests have a uniform distribution of sessionIds, the probability of contention on a segment is reduced by a factor of the number of segments.
We ensure backwards compatibility by ensuring total number of cache entries remain the same as configured and sessionIds are randomly allocated.
Reviewers: Igor Soarez <soarez@apple.com>, Chia-Ping Tsai <chia7712@gmail.com>
Introduce a new field id in annotation ClusterConfigProperty. The main purpose of new field is to define specific broker/controller(kraft) property. And the default value is -1 which means the ClusterConfigProperty will apply to all broker/controller.
Note that under Type.KRAFT mode, the controller id starts from 3000, and then increments by one each time. Other modes the broker/controller id starts from 0 and then increments by one.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
As a part of 2e8d69b78c, we had introduced the TransactionAbortableException in AK. On more detailed analysis we figured out that the enum SupportedOperation was a bit misleading. Hence updated the same to TransactionSupportedOperation to allow a better and more defined function signature
Reviewers: Justine Olshan <jolshan@confluent.io>
When there are overlapping segments in the remote storage, then the deletion may fail to remove the segments due to isRemoteSegmentWithinLeaderEpochs check. Once the deletion starts to fail for a partition, then segments won't be eligible for cleanup. The one workaround that we have is to move the log-start-offset using the kafka-delete-records script.
Reviewers: Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>
Fix to allow to initialize positions for newly assigned partitions, while the onPartitionsAssigned callback is running, even though the partitions remain non-fetchable until the callback completes.
Before this PR, we were not allowing initialization or fetching while the callback was running. The fix here only allows to initialize the newly assigned partition position, and keeps the existing logic for making sure that the partition remains non-fetchable until the callback completes.
The need for this fix came out in one of the connect system tests, that attempts to retrieve a newly assigned partition position with a call to consumer.position from within the onPartitionsAssigned callback (WorkerSinkTask). With this PR, we allow to make such calls (test added), which is the behaviour of the legacy consumer.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
The patch implements JoinGroup API for the new consumer groups. It allow members using the classic rebalance protocol with the consumer embedded protocol to join a new consumer group.
Reviewers: David Jacot <djacot@confluent.io>
When the consumer group protocol is used in a cluster, it is, at the moment, impossible to see all records stored in the __consumer_offsets topic with kafka-dump-log --offsets-decoder. It does not know how to handle all the new records.
This patch refactors the OffsetsMessageParser used internally by kafka-dump-log to use the RecordSerde used by the new group coordinator. It ensures that the tool is always in sync with the coordinator implementation. The patch also changes the format to using the toString'ed representations of the records instead of having custom logic to dump them. It ensures that all the information is always dumped. The downside of the latest is that inner byte arrays (e.g. assignment in the classic protocol) are no longer deserialized. Personally, I feel like that it is acceptable and it is actually better to stay as close as possible to the actual records in this tool. It also avoids issues like https://issues.apache.org/jira/browse/KAFKA-15603.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This patch fixes two issues with IncrementalAlterConfigs and the ZK migration. First, it changes the handling of IncrementalAlterConfigs to check if the controller is ZK vs KRaft and only forward for KRaft. Second, it adds a check in KafkaZkClient#setOrCreateEntityConfigs to ensure a ZK broker is not directly modifying configs in ZK if there is a KRaft controller. This closes the race condition between KRaft taking over as the active controller and the ZK brokers learning about this.
*Forwarding*
During the ZK migration, there is a time when the ZK brokers are running with migrations enabled, but KRaft has yet to take over as the controller. Prior to KRaft taking over as the controller, the ZK brokers in migration mode were unconditionally forwarding IncrementalAlterConfigs (IAC) to the ZK controller. This works for some config types, but breaks when setting BROKER and BROKER_LOGGER configs for a specific broker. The behavior in KafkaApis for IAC was to always forward if the forwarding manager was defined. Since ZK brokers in migration mode have forwarding enabled, the forwarding would happen, and the special logic for BROKER and BROKER_LOGGER would be missed, causing the request to fail.
With this fix, the IAC handler will check if the controller is KRaft or ZK and only forward for KRaft.
*Protected ZK Writes*
As part of KIP-500, we moved most (but not all) ZK mutations to the ZK controller. One of the things we did not move fully to the controller was entity configs. This is because there was some special logic that needed to run on the broker for certain config updates. If a broker-specific config was set, AdminClient would route the request to the proper broker. In KRaft, we have a different mechanism for handling broker-specific config updates.
Leaving this ZK update on the broker side would be okay if we were guarding writes on the controller epoch, but it turns out KafkaZkClient#setOrCreateEntityConfigs does unprotected "last writer wins" updates to ZK. This means a ZK broker could update the contents of ZK after the metadata had been migrated to KRaft. No good! To fix this, this patch adds a check on the controller epoch to KafkaZkClient#setOrCreateEntityConfigs but also adds logic to fail the update if the controller is a KRaft controller.
The new logic in setOrCreateEntityConfigs adds STALE_CONTROLLER_EPOCH as a new exception that can be thrown while updating configs.
Reviewers: Luke Chen <showuon@gmail.com>, Akhilesh Chaganti <akhileshchg@users.noreply.github.com>, Chia-Ping Tsai <chia7712@gmail.com>
Adds support for the KafkaRaftClient to read the control records KRaftVersionRecord and VotersRecord in the snapshot and log. As the control records in the KRaft partition are read, the replica's known set of voters are updated. This change also contains the necessary changes to include the control records when a snapshot is generated by the KRaft state machine.
It is important to note that this commit changes the code and the in-memory state to track the sets of voters but it doesn't change any data that is externally exposed. It doesn't change the RPCs, data stored on disk or configuration.
When the KRaft replica starts the PartitionListener reads the latest snapshot and then log segments up to the LEO, updating the in-memory state as it reads KRaftVersionRecord and VotersRecord. When the replica (leader and follower) appends to the log, the PartitionListener catches up to the new LEO. When the replica truncates the log because of a diverging epoch, the PartitionListener also truncates the in-memory state to the new LEO. When the state machine generate a new snapshot the PartitionListener trims any prefix entries that are not needed. This is all done to minimize the amount of data tracked in-memory and to make sure that it matches the state on disk.
To implement the functionality described above this commit also makes the following changes:
Adds control records for KRaftVersionRecord and VotersRecord. KRaftVersionRecord describes the finalized kraft.version supported by all of the replicas. VotersRecords describes the set of voters at a specific offset.
Changes Kafka's feature version to support 0 as the smallest valid value. This is needed because the default value for kraft.version is 0.
Refactors FileRawSnapshotWriter so that it doesn't directly call the onSnapshotFrozen callback. It adds NotifyingRawSnapshotWriter for calling such callbacks. This reorganization is needed because in this change both the KafkaMetadataLog and the KafkaRaftClient need to react to snapshots getting frozen.
Cleans up KafkaRaftClient's initialization. Removes initialize from RaftClient - this is an implementation detail that doesn't need to be exposed in the interface. Removes RaftConfig.AddressSpec and simplifies the bootstrapping of the static voter's address. The bootstrapping of the address is delayed because of tests. We should be able to simplify this further in future commits.
Update the DumpLogSegment CLI to support the new control records KRaftVersionRecord and VotersRecord.
Fix the RecordsSnapshotReader implementations so that the iterator includes control records. RecordsIterator is extended to support reading the new control records.
Improve the BatchAccumulator implementation to allow multiple control records in one control batch. This is needed so that KRaft can make sure that VotersRecord is included in the same batch as the control record (KRaftVersionRecord) that upgrades the kraft.version to 1.
Add a History interface and default implementation TreeMapHistory. This is used to track all of the sets of voters between the latest snapshot and the LEO. This is needed so that KafkaRaftClient can query for the latest set of voters and so that KafkaRaftClient can include the correct set of voters when the state machine generates a new snapshot at a given offset.
Add a builder pattern for RecordsSnapshotWriter. The new builder pattern also implements including the KRaftVersionRecord and VotersRecord control records in the snapshot as necessary. A KRaftVersionRecord should be appended if the kraft.version is greater than 0 at the snapshot's offset. Similarly, a VotersRecord should be appended to the snapshot with the latest value up to the snapshot's offset.
Reviewers: Jason Gustafson <jason@confluent.io>
The AsyncKafkaConsumer implementation of position(TopicPartition, Duration) was not updating its internal Timer, causing it to execute the loop forever. Adding a call to update the Timer at the bottom of the loop fixes the issue.
An integration test was added to catch this case; it fails without the newly added call to Timer.update(long).
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Instead of entering pending forever, this PR invoke next schedule after 1ms. However, the side effect is busy-waiting. Hence, This PR also update the docs to remind users about that - the issue about smaller log.segment.delete.delay.ms
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This PR do the following cleanup for TestUtils.scala
1) remove unused methods
2) move methods used by single test class out of
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
ControllerRegistrationManagerTest is flaky due to the poll in L221. The potential root cause is a race condition between the first poll (L221) and the second poll (L229). Before the second poll, we mock a response (L226), which should be processed by the second poll. However, if the first poll take this away, the second poll would get nothing, and this could lead to an error.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* Make ClusterConfig immutable
* Make BrokerNode immutable
* Refactor out build argument in ControllerNode
* Add setPrefix and replace put property with set map in ClusterConfig
* Remove rollingBrokerRestart from ClusterInstance interface
* Refactor KRaftClusterTest#doOnStartedKafkaCluster
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Even if the log start offset is updated, the log deletion might still not completed. Making the test more robust.
Reviewers: Luke Chen <showuon@gmail.com>
If there are some logs to be deleted during the log dir movement, we'll send for a scheduler to do the deletion later.
However, when the log dir movement completed, the future log is renamed, the async log deletion will fail with no file existed error.
Signed-off-by: PoAn Yang <payang@apache.org>
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, SoontaekLim <soontaek.lim@neya.kr>, Johnny Hsu <johnnyhsu@fb.com>
The javadoc for KafkaConsumer.commitSync says:
Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
(or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
This is not always true in the async consumer, where there is no code at all to make sure that the callback is executed before commitSync returns.
Similarly, the async consumer is also missing logic to await callback execution in close. While the javadoc doesn't explicitly promise callback execution, it promises "completing commits", which one would reasonably expect to include callback execution. Also, the legacy consumer contains some code to execute callbacks before closing.
This change proposed a number of fixes to clean up the callback execution guarantees in the async consumer:
We keep track of the incomplete async commit
futures and wait for them to complete before returning from
commitSync or close (if there is time).
Since we need to block to make sure that our previous commits are
completed, we allow the consumer to wake up.
Some similar gaps are addressed in the legacy consumer, see #15693
Testing
Two new integration tests and a couple of unit tests.
Reviewers: Bruno Cadonna <cadonna@apache.org>, Kirk True <ktrue@confluent.io>, Lianet Magrans <lianetmr@gmail.com>
It is observed that for scenario (3), i.e. a broker crashes while it
waits for the future replica to catch up for the second time and the
`dir1` is unavailable when the broker is restarted, the
broker tries to create the partition in `dir2` according to the metadata
in the controller. However, ReplicaManager also tries to resume the
stale future replica which was abandoned when the broker crashed. This
results in the renaming of the future replica to fail eventually because
the directory for the topic partition already exists in `dir2` and the
broker then marks `dir2` as offline.
This PR attempts to fix this behaviour by ignoring any future replicas
which are in the same directory as where the log exists. It further
marks the stale future replica for deletion.
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Igor Soarez <soarez@apple.com>, Proven Provenzano <pprovenzano@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Add the support for DescribeTopicPartitions API to AdminClient. For this initial implementation, we are simply loading all of the results into memory on the client side.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Kirk True <ktrue@confluent.io>, David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, David Arthur <mumrah@gmail.com>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Luke Chen <showuon@gmail.com>, Igor Soarez<soarez@apple.com>
Co-authored-by: lixinyang <nickxyli@tencent.com>
This patch changes the behavior of the migrating ZK broker to always delete the local metadata log
during startup. This deletion is done immediately before creating the RaftManager which will
re-create the log directory and let the broker re-replicate the log from the active controller.
This new behavior is only present for ZK brokers that having migrations enabled. KRaft brokers,
even those with migrations enabled, will not delete their local metadata log. KRaft controllers are
not impacted by this change.
The rationale for this change is to make it easier for operators to re-attempt a ZK to KRaft
migration after having reverted back to ZK mode. If an operator has reverted back to ZK mode, there
will be an invalid metadata log on the disk of each broker. In order to re-attempt the migration in
the future, this log needs to be deleted. This can be pretty burdensome to the operator for large
clusters, especially since the log deletion must be done while the broker is offline.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Igor Soarez <soarez@apple.com>, Chia-Ping Tsai <chia7712@gmail.com>
At the moment it can be a bit difficult to troubleshoot issues related to the AssignmentsManager. Mainly because:
Topic partitions are logged with topic ID and partition index but without the topic name.
Directory IDs are logged without the directory path.
Assignment reasons aren't tracked.
This patch addresses the three issues.
Reviewers: Luke Chen <showuon@gmail.com>
This fixes an issue that when starting a Docker container for the first time, the cluster ID used when formatting the log dir would not be $CLUSTER_ID but Some($CLUSTER_ID) (KAFKA-16473).
In order to be able to test the formatStorageCmd method which contained the bug, the method has been made package private.
Reviewers: cooper.tseng@suse.com, Vedarth Sharma <142404391+VedarthConfluent@users.noreply.github.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
The CurrentControllerId metric added by KIP-1001 is unreliable in ZK
mode. Sometimes when there is no active ZK-based controller, it still
shows the previous controller ID. Instead, it should show -1 in that
situation.
This PR fixes that by using the controller ID from the
KafkaController.scala, which is obtained directly from the controller
znode. It also adds a new test, ControllerIdMetricTest.scala.
Reviewers: David Arthur <mumrah@gmail.com>
This patch adds the `group.consumer.migration.policy` config which controls how consumer groups can be converted from classic group to consumer group and vice versa. The config is kept as an internal one while we develop the feature.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
If a future replica doesn't get promoted, any directory reassignment sent to the controller should be reversed.
The current logic is already addressing the case when a replica hasn't yet been promoted and the controller hasn't yet acknowledged the directory reassignment. However, it doesn't cover the case where the replica does not get promoted due to a directory failure after the controller has acknowledged the reassignment but before the future replica catches up again and is promoted to main replica.
Reviewers: Luke Chen <showuon@gmail.com>
Fully implemented legacy subscription using Pattern for AsyncKafkaConsumer.
Enabled related tests for subscription using Pattern in PlaintextConsumerTest.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Kirk True <ktrue@confluent.io>, David Jacot <djacot@confluent.io>, Bruno Cadonna <cadonna@apache.org>
We do iterate the records to find the offsetOfMaxTimestamp instead of returning the cached one when handling ListOffsetsRequest.MAX_TIMESTAMP, since it is hard to align all paths to get correct offsetOfMaxTimestamp. The known paths are shown below.
1. convertAndAssignOffsetsNonCompressed -> we CAN get correct offsetOfMaxTimestamp when validating all records
2. assignOffsetsNonCompressed -> ditto
3. validateMessagesAndAssignOffsetsCompressed -> ditto
4. validateMessagesAndAssignOffsetsCompressed#buildRecordsAndAssignOffsets -> ditto
5. appendAsFollow#append#analyzeAndValidateRecords -> we CAN'T get correct offsetOfMaxTimestamp as iterating all records is expensive when fetching records from leader
6. LogSegment#recover -> ditto
Reviewers: Jun Rao <junrao@gmail.com>
Sometimes we want to define server properties for all test cases, and using BeforeEach to modify the ClusterConfig is the only way. The side effect is that the IDE does not like the style since IDE can't recognize custom ParameterResolver of ClusterConfig.
The alternative is that we can take ClusterInstance from constructor first, and then we modify the inner ClusterConfig in BeforeEach phase. However, that may confuse users about the life cycle of "configs".
In short, I prefer to define the server property by ClusterTestDefaults. It already includes some server-side default property, and we can enhance that to deal with more existent test case.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Add new method shutdown(Duration) to accept timeout argument. We can leverage the new method to run non-graceful shutdown in testing.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
After profiling the kafka tests, tons of client-metrics-reaper thread not cleanup after BrokerServer shutdown.
The thread client-metrics-reaper comes from ClientMetricsManager#expirationTimer, and BrokerServer#shudown doesn't close ClientMetricsManager which let the thread still runs in background.
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
A subtle difference in the behavior of the two API causes the failures with Invalid negative timestamp.
In this PR, the list offsets response will be processed differently based on the API. For beginingOffsets/endOffsets - the offset response should be directly returned.
For offsetsForTimes - A OffsetAndTimestamp object is constructed for each requested TopicPartition before being returned.
The reason beginningOffsets and endOffsets - We are expecting a -1 timestamp from the response which subsequently causes the invalid timestamp exception because the original code tries to construct an OffsetAndTimestamp object upon returning.
In this PR, the following missing tasks are added:
short-circuit both beginningOrEndOffsets
Test both API (beginningOrEndOffsets, OffsetsForTime)
Seems like we don't have tests for this API: Note it is presented in other IntegrationTests but they are added to test Async consumer
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Lianet Magrans <lianetmr@gmail.com>
- Use `Empty` instead of 'none' when referring to `Optional` values.
- `Headers.lastHeader` returns `null` when no header is found.
- Fix minor spelling mistakes.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This pr fixes the bug created by #15263 which caused topic partition to be recreated whenever the original log dir is offline: Log directory failure re-creates partitions in another logdir automatically
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Igor Soarez <soarez@apple.com>, Gaurav Narula <gaurav_narula2@apple.com>, Proven Provenzano <pprovenzano@confluent.io>