Implement request handling for the updated versions of the KRaft RPCs (Fetch, FetchSnapshot, Vote,
BeginQuorumEpoch and EndQuorumEpoch). This doesn't add support for KRaft replicas to send the new
version of the KRaft RPCs. That will be implemented in KAFKA-16529.
All of the RPCs responses were extended to include the leader's endpoint for the listener of the
channel used in the request. EpochState was extended to include the leader's endpoint information
but only the FollowerState and LeaderState know the leader id and its endpoint(s).
For the Fetch request, the replica directory id was added. The leader now tracks the follower's log
end offset using both the replica id and replica directory id.
For the FetchSnapshot request, the replica directory id was added. This is not used by the KRaft
leader and it is there for consistency with Fetch and for help debugging.
For the Vote request, the replica key for both the voter (destination) and the candidate (source)
were added. The voter key is checked for consistency. The candidate key is persisted when the vote
is granted.
For the BeginQuorumEpoch request, all of the leader's endpoints are included. This is needed so
that the voters can return the leader's endpoint for all of the supported listeners.
For the EndQuorumEpoch request, all of the leader's endpoints are included. This is needed so that
the voters can return the leader's endpoint for all of the supported listeners. The successor list
has been extended to include the directory id. Receiving voters can use the entire replica key when
searching their position in the successor list.
Updated the existing test in KafkaRaftClientTest and KafkaRaftClientSnapshotTest to execute using
both the old version and new version of the RPCs.
Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
In the check-quorum calculation, the leader should not assume that it is part of the voter set. This may happen when the leader is removing itself from the voter set. This PR improves it by checking if leader is in the voter set or not, and decide how many follower fetches required. Also add tests.
Co-authored-by: Colin P. McCabe <cmccabe@apache.org>
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, José Armando García Sancio <jsancio@apache.org>
Add support for KIP-953 KRaft Quorum reconfiguration in the DescribeQuorum request and response.
Also add support to AdminClient.describeQuorum, so that users will be able to find the current set of
quorum nodes, as well as their directories, via these RPCs.
Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Andrew Schofield <aschofield@confluent.io>
1. Changing log message from error to info - We may expect the HW calculation to give us a smaller result than the current HW in the case of quorum reconfiguration. We will continue to not allow the HW to actually decrease.
2. Logic for finding the updated LeaderEndOffset for updateReplicaState is changed as well. We do not assume the leader is in the voter set and check the observer states as well.
3. updateLocalState now accepts an additional "lastVoterSet" param which allows us to update the leader state with the last known voters. any nodes in this set but not in voterStates will be added to voterStates and removed from observerStates, any nodes not in this set but in voterStates will be removed from voterStates and added to observerStates
Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@apache.org>
Allow KRaft replicas to send requests to any node (Node) not just the nodes configured in the
controller.quorum.voters property. This flexibility is needed so KRaft can implement the
controller.quorum.voters configuration, send request to the dynamically changing set of voters and
send request to the leader endpoint (Node) discovered through the KRaft RPCs (specially
BeginQuorumEpoch request and Fetch response).
This was achieved by changing the RequestManager API to accept Node instead of just the replica ID.
Internally, the request manager tracks connection state using the Node.idString method to match the
connection management used by NetworkClient.
The API for RequestManager is also changed so that the ConnectState class is not exposed in the
API. This allows the request manager to reclaim heap memory for any connection that is ready.
The NetworkChannel was updated to receive the endpoint information (Node) through the outbound raft
request (RaftRequent.Outbound). This makes the network channel more flexible as it doesn't need to
be configured with the list of all possible endpoints. RaftRequest.Outbound and
RaftResponse.Inbound were updated to include the remote node instead of just the remote id.
The follower state tracked by KRaft replicas was updated to include both the leader id and the
leader's endpoint (Node). In this comment the node value is computed from the set of voters. In
future commit this will be updated so that it is sent through KRaft RPCs. For example
BeginQuorumEpoch request and Fetch response.
Support for configuring controller.quorum.bootstrap.servers was added. This includes changes to
KafkaConfig, QuorumConfig, etc. All of the tests using QuorumTestHarness were changed to use the
controller.quorum.bootstrap.servers instead of the controller.quorum.voters for the broker
configuration. Finally, the node id for the bootstrap server will be decreasing negative numbers
starting with -2.
Reviewers: Jason Gustafson <jason@confluent.io>, Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
Fix the code in the RaftControllerNodeProvider to query RaftManager to find Node information,
rather than consulting a static map. Add a RaftManager.voterNode function to supply this
information. In KRaftClusterTest, add testControllerFailover to get more coverage of controller
failovers.
Reviewers: José Armando García Sancio <jsancio@apache.org>
Allow KRaft replicas to read and write version 0 and 1 of the quorum-state file. Which version is written is controlled by the kraft.version. With kraft.version 0, version 0 of the quorum-state file is written. With kraft.version 1, version 1 of the quorum-state file is written. Version 1 of the quorum-state file adds the VotedDirectoryId field and removes the CurrentVoters. The other fields removed in version 1 are not important as they were not overwritten or used by KRaft.
In kraft.version 1 the set of voters will be stored in the kraft partition log segments and snapshots.
To implement this feature the following changes were made to KRaft.
FileBasedStateStore was renamed to FileQuorumStateStore to better match the name of the implemented interface QuorumStateStore.
The QuorumStateStore::writeElectionState was extended to include the kraft.version. This version is used to determine which version of QuorumStateData to store. When writing version 0 the VotedDirectoryId is not persisted but the latest value is kept in-memory. This allows the replica to vote consistently while they stay online. If a replica restarts in the middle of an election it will forget the VotedDirectoryId if the kraft.version is 0. This should be rare in practice and should only happen if there is an election and failure while the system is upgrading to kraft.version 1.
The type ElectionState, the interface EpochState and all of the implementations of EpochState (VotedState, UnattachedState, FollowerState, ResignedState, CandidateState and LeaderState) are extended to support the new voted directory id.
The type QuorumState is changed so that local directory id is used. The type is also changed so that the latest value for the set of voters and the kraft version is query from the KRaftControlRecordStateMachine.
The replica directory id is read from the meta.properties and passed to the KafkaRaftClient. The replica directory id is guaranteed to be set in the local replica.
Adds a new metric for current-vote-directory-id which exposes the latest in-memory value of the voted directory id.
Renames VoterSet.VoterKey to ReplicaKey.
It is important to note that after this change, version 1 of the quorum-state file will not be written by kraft controllers and brokers. This change adds support reading and writing version 1 of the file in preparation for future changes.
Reviewers: Jun Rao <junrao@apache.org>
Validate that a control batch in the batch accumulator has at least one control record.
Reviewers: Jun Rao <junrao@apache.org>, Chia-Ping Tsai <chia7712@apache.org>
Adds support for the KafkaRaftClient to read the control records KRaftVersionRecord and VotersRecord in the snapshot and log. As the control records in the KRaft partition are read, the replica's known set of voters are updated. This change also contains the necessary changes to include the control records when a snapshot is generated by the KRaft state machine.
It is important to note that this commit changes the code and the in-memory state to track the sets of voters but it doesn't change any data that is externally exposed. It doesn't change the RPCs, data stored on disk or configuration.
When the KRaft replica starts the PartitionListener reads the latest snapshot and then log segments up to the LEO, updating the in-memory state as it reads KRaftVersionRecord and VotersRecord. When the replica (leader and follower) appends to the log, the PartitionListener catches up to the new LEO. When the replica truncates the log because of a diverging epoch, the PartitionListener also truncates the in-memory state to the new LEO. When the state machine generate a new snapshot the PartitionListener trims any prefix entries that are not needed. This is all done to minimize the amount of data tracked in-memory and to make sure that it matches the state on disk.
To implement the functionality described above this commit also makes the following changes:
Adds control records for KRaftVersionRecord and VotersRecord. KRaftVersionRecord describes the finalized kraft.version supported by all of the replicas. VotersRecords describes the set of voters at a specific offset.
Changes Kafka's feature version to support 0 as the smallest valid value. This is needed because the default value for kraft.version is 0.
Refactors FileRawSnapshotWriter so that it doesn't directly call the onSnapshotFrozen callback. It adds NotifyingRawSnapshotWriter for calling such callbacks. This reorganization is needed because in this change both the KafkaMetadataLog and the KafkaRaftClient need to react to snapshots getting frozen.
Cleans up KafkaRaftClient's initialization. Removes initialize from RaftClient - this is an implementation detail that doesn't need to be exposed in the interface. Removes RaftConfig.AddressSpec and simplifies the bootstrapping of the static voter's address. The bootstrapping of the address is delayed because of tests. We should be able to simplify this further in future commits.
Update the DumpLogSegment CLI to support the new control records KRaftVersionRecord and VotersRecord.
Fix the RecordsSnapshotReader implementations so that the iterator includes control records. RecordsIterator is extended to support reading the new control records.
Improve the BatchAccumulator implementation to allow multiple control records in one control batch. This is needed so that KRaft can make sure that VotersRecord is included in the same batch as the control record (KRaftVersionRecord) that upgrades the kraft.version to 1.
Add a History interface and default implementation TreeMapHistory. This is used to track all of the sets of voters between the latest snapshot and the LEO. This is needed so that KafkaRaftClient can query for the latest set of voters and so that KafkaRaftClient can include the correct set of voters when the state machine generates a new snapshot at a given offset.
Add a builder pattern for RecordsSnapshotWriter. The new builder pattern also implements including the KRaftVersionRecord and VotersRecord control records in the snapshot as necessary. A KRaftVersionRecord should be appended if the kraft.version is greater than 0 at the snapshot's offset. Similarly, a VotersRecord should be appended to the snapshot with the latest value up to the snapshot's offset.
Reviewers: Jason Gustafson <jason@confluent.io>
KRaft was only notifying listeners of the latest leader and epoch when the replica transition to a new state. This can result in the listener never getting notified if the registration happened after it had become a follower.
This problem doesn't exists for the active leader because the KRaft implementation attempts to notified the listener of the latest leader and epoch when the replica is the active leader.
This issue is fixed by notifying the listeners of the latest leader and epoch after processing the listener registration request.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
When there's only 1 voter, there will be no fetch request from other voters. In this case, we should still not expire the checkQuorum timer because there's just 1 voter.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Federico Valeri <fedevaleri@gmail.com>, José Armando García Sancio <jsancio@apache.org>
This patch moves the `RaftIOThread` implementation into Java. I changed the name to `KafkaRaftClientDriver` since the main thing it does is drive the calls to `poll()`. There shouldn't be any changes to the logic.
Reviewers: José Armando García Sancio <jsancio@apache.org>
* MINOR Minor Cleanup KRaft code
Clean up minor issues with KRaft code.
Add batch info excluding the records to ease troubleshotting in RecordsSnapshotReader#lastContainedLogTimestamp
Reviewers: Luke Chen <showuon@gmail.com>
Signed-off-by: Josep Prat <josep.prat@aiven.io>
In KIP-595, we expect to piggy-back on the `quorum.fetch.timeout.ms` config, and if the leader did not receive Fetch requests from a majority of the quorum for that amount of time, it would begin a new election, to resolve the network partition in the quorum. But we missed this implementation in current KRaft. Fixed it in this PR.
The commit include:
1. Added a timer with timeout configuration in `LeaderState`, and check if expired each time when leader is polled. If expired, resigning the leadership and start a new election.
2. Added `fetchedVoters` in `LeaderState`, and update the value each time received a FETCH or FETCH_SNAPSHOT request, and clear it and resets the timer if the majority - 1 of the remote voters sent such requests.
Reviewers: José Armando García Sancio <jsancio@apache.org>
This is now possible since `InterBrokerSend` was moved from `core` to `server-common`.
Also rewrite/move `KafkaNetworkChannelTest`.
The scala version of `KafkaNetworkChannelTest` passed with the changes here (before I
deleted it).
Reviewers: Justine Olshan <jolshan@confluent.io>, José Armando García Sancio <jsancio@users.noreply.github.com>
Spotbugs was temporarily disabled as part of KAFKA-15485 to support Kafka build with JDK 21. This PR upgrades the spotbugs version to 4.8.0 which adds support for JDK 21 and enables it's usage on build again.
Reviewers: Divij Vaidya <diviv@amazon.com>
This is one of the steps required for kafka to compile with Java 21.
For each case, one of the following fixes were applied:
1. Suppress warning if fixing would potentially result in an incompatible change (for public classes)
2. Add final to one or more methods so that the escape is not possible
3. Replace method calls with direct field access.
In addition, we also fix a couple of compiler warnings related to deprecated references in the `core` module.
See the following for more details regarding the new lint warning:
https://www.oracle.com/java/technologies/javase/21-relnote-issues.html#JDK-8015831
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Chris Egerton <chrise@aiven.io>
In the Windows OS atomic move are not allowed if the file has another open handle. E.g
__cluster_metadata-0\quorum-state: The process cannot access the file because it is being used by another process
at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403)
at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293)
at java.base/java.nio.file.Files.move(Files.java:1430)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:949)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:932)
at org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152)
This is fixed by first closing the temporary quorum-state file before attempting to move it.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
Co-Authored-By: Renaldo Baur Filho <renaldobf@gmail.com>
Reading an unknown version of quorum-state-file should trigger an error. Currently the only known version is 0. Reading any other version should cause an error.
Reviewers: Justine Olshan <jolshan@confluent.io>, Luke Chen <showuon@gmail.com>
In a non-empty log the KRaft leader only notifies the listener of leadership when it has read to the leader's epoch start offset. This guarantees that the leader epoch has been committed and that the listener has read all committed offsets/records.
Unfortunately, the KRaft leader doesn't do this when the log is empty. When the log is empty the listener is notified immediately when it has become leader. This makes the API inconsistent and harder to program against.
This change fixes that by having the KRaft leader wait for the listener's nextOffset to be greater than the leader's epochStartOffset before calling handleLeaderChange.
The RecordsBatchReader implementation is also changed to include control records. This makes it possible for the state machine learn about committed control records. This additional information can be used to compute the committed offset or for counting those bytes when determining when to snapshot the partition.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
The KRaft client uses an expiration service to complete FETCH requests that have timed out. This expiration service uses a different thread from the KRaft polling thread. This means that it is unsafe for the expiration service thread to call tryCompleteFetchRequest. tryCompleteFetchRequest reads and updates a lot of states that is assumed to be only be read and updated from the polling thread.
The KRaft client now does not call tryCompleteFetchRequest when the FETCH request has expired. It instead will send the FETCH response that was computed when the FETCH request was first handled.
This change also fixes a bug where the KRaft client was not sending the FETCH response immediately, if the response contained a diverging epoch or snapshot id.
Reviewers: Jason Gustafson <jason@confluent.io>
On ext4 file systems we have seen snapshots with zero-length files. This is possible if
the file is closed and moved before forcing the channel to write to disk.
Reviewers: Ron Dagostino <rndgstn@gmail.com>, Alok Thatikunta <athatikunta@confluent.io>
Provide the exact record offset to QuorumController.replay() in all cases. There are several situations
where this is useful, such as logging, implementing metadata transactions, or handling broker
registration records.
In the case where the QC is inactive, and simply replaying records, it is easy to compute the exact
record offset from the batch base offset and the record index.
The active QC case is more difficult. Technically, when we submit records to the Raft layer, it can
choose a batch base offset later than the one we expect, if someone else is also adding records.
While the QC is the only entity submitting data records, control records may be added at any time.
In the current implementation, these are really only used for leadership elections. However, this
could change with the addition of quorum reconfiguration or similar features.
Therefore, this PR allows the QC to tell the Raft layer that a record append should fail if it
would have resulted in a batch base offset other than what was expected. This in turn will trigger a
controller failover. In the future, if automatically added control records become more common, we
may wish to have a more sophisticated system than this simple optimistic concurrency mechanism. But
for now, this will allow us to rely on the offset as correct.
In order that the active QC can learn what offset to start writing at, the PR also adds a new
RaftClient#endOffset function.
At the Raft level, this PR adds a new exception, UnexpectedBaseOffsetException. This gets thrown
when we request a base offset that doesn't match the one the Raft layer would have given us.
Although this exception should cause a failover, it should not be considered a fault. This
complicated the exception handling a bit and motivated splitting more of it out into the new
EventHandlerExceptionInfo class. This will also let us unit test things like slf4j log messages a
bit better.
Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@apache.org>
Implement some of the metrics from KIP-938: Add more metrics for
measuring KRaft performance.
Add these metrics to QuorumControllerMetrics:
kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount
kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount
kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount
kafka.controller:type=KafkaController,name=NewActiveControllersCount
Create LoaderMetrics with these new metrics:
kafka.server:type=MetadataLoader,name=CurrentMetadataVersion
kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount
Create SnapshotEmitterMetrics with these new metrics:
kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes
kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs
Reviewers: Ron Dagostino <rndgstn@gmail.com>
If the follower has an empty log, fetches with offset 0, it is more
efficient for the leader to reply with a snapshot id (redirect to
FETCH_SNAPSHOT) than for the follower to continue fetching from the log
segments.
Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
If the KRaft listener is at offset 0, the start of the log, and KRaft has generated a snapshot, it should prefer the latest snapshot instead of having the listener read from the start of the log.
This is implemented by having KafkaRaftClient send a Listener.handleLoadSnapshot event, if the Listener is at offset 0 and the KRaft partition has generated a snapshot.
Reviewers: Jason Gustafson <jason@confluent.io>, David Arthur <mumrah@gmail.com>
This change refactors the lastContainedLogTimestamp to the Snapshots class, for re-usability. Introduces IdentitySerde based on ByteBuffer, required for using RecordsSnapshotReader. This change also removes the "recordSerde: RecordSerde[_]" argument from the KafkaMetadataLog constructor.
Reviewers: José Armando García Sancio <jsancio@apache.org>
After this change,
For broker side decompression: JMH benchmark RecordBatchIterationBenchmark demonstrates 20-70% improvement in throughput (see results for RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize).
For consumer side decompression: JMH benchmark RecordBatchIterationBenchmark a mix bag of single digit regression for some compression type to 10-50% improvement for Zstd (see results for RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize).
Reviewers: Luke Chen <showuon@gmail.com>, Manyanda Chitimbo <manyanda.chitimbo@gmail.com>, Ismael Juma <mail@ismaeljuma.com>
Rename handleSnapshot to handleLoadSnapshot to make it explicit that it is handling snapshot load,
not generation.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
Currently, the current-state KRaft related metric reports follower state for a broker while technically it should be reported as an observer as the kafka-metadata-quorum tool does.
Reviewers: Luke Chen <showuon@gmail.com>, dengziming <dengziming1993@gmail.com>