Commit Graph

223 Commits

Author SHA1 Message Date
José Armando García Sancio 7b6d4fbc57
KAFKA-17333; ResignedState should not notify of leader change (#16900)
When a voter fails as leader (LeaderState) the quorum-state still states that it is the leader of
the epoch. When the voter starts it never starts as leader and instead starts as resigned
(ResignedState) if it was previously a leader. This causes the KRaft client to immediately notify
the state machine (e.g QuorumController) that it is leader or active. This is incorrect for two
reasons.

One, the controller cannot be notified of leadership until it has reached the LEO. If the
controller is notified before that it will generate and append records that are not based on the
latest state.

Two, it is not practical to notify of local leadership when it is resigned since any write
operation (prepareAppend and schedulePreparedAppend) will fail with NotLeaderException while KRaft
is in the resigned state.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, David Arthur <mumrah@gmail.com>
2024-08-20 16:24:13 -07:00
Mason Chen fb7e47f6e2
KAFKA-17169: Add EndpointsTest (#16659)
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2024-08-20 15:08:28 -07:00
José Armando García Sancio ee71156295
KAFKA-17332; Controller always flush and can call resign on observers (#16907)
This change includes two improvements.

When the leader removes itself from the voters set clients of RaftClient may call resign. In those cases the leader is not in the voter set and should not throw an exception.

Controllers that are observers must flush the log on every append because leader may be trying to add them to the voter set. Leader always assume that voters flush their disk before sending a Fetch request.

Reviewers: David Arthur <mumrah@gmail.com>, Alyssa Huang <ahuang@confluent.io>
2024-08-19 20:44:23 -04:00
José Armando García Sancio 20c3e7324b
KAFKA-16842; Fix config validation and support unknown voters (#16892)
This change fixes the Kafka configuration validation to take into account the reconfiguration changes to configuration and allows KRaft observers to start with an unknown set of voters.

For the Kafka configuration validation the high-level change is that now the user only needs to specify either the controller.quorum.bootstrap.servers property or the controller.quorum.voters property. The other notable change in the configuration is that controller listeners can now be (and should be) specified in advertise.listeners property.

Because Kafka can now be configured without any voters and just the bootstrap servers. The KRaft client needs to allow for an unknown set of voters during the initial startup. This is done by adding the VoterSet#empty set of voters to the KRaftControlRecordStateMachine.

Lastly the RaftClientTestContext type is updated to support this new configuration for KRaft and a test is added to verify that observers can start and send Fetch requests when the voters are unknown.

Reviewers: David Arthur <mumrah@gmail.com>
2024-08-16 15:53:13 -04:00
TengYao Chi 81f0b13a70
KAFKA-17238 Move VoterSet and ReplicaKey from raft.internals to raft (#16775)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-08-16 00:24:51 +08:00
Josep Prat adaf2d390f
MINOR: Fix visibility for classes exposed outside of their scope (#16886)
These 2 classes are package protected but they are part of the public
API of public methods. To have clean APIs we should make this
consistent.

Static class ReplicaState is exposed in RaftUtil#singletonDescribeQuorumResponse method which is public.

RequestSender is implemented by a public class and it's exposed in the public constructor of AddVoterHandler.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2024-08-15 12:10:10 -04:00
José Armando García Sancio 0f7cd4dcde
KAFKA-17304; Make RaftClient API for writing to log explicit (#16862)
RaftClient API is changed to separate the batch accumulation (RaftClient#prepareAppend) from scheduling the append of accumulated batches (RaftClient#schedulePrepatedAppend) to the KRaft log. This change is needed to better match the controller's flow of replaying the generated records before replicating them. When the controller replay records it needs to know the offset associated with the record. To compute a table offset the KafkaClient needs to be aware of the records and their log position.

The controller uses this new API by generated the cluster metadata records, compute their offset using RaftClient#prepareAppend, replay the records in the state machine, and finally allowing KRaft to append the records with RaftClient#schedulePreparedAppend.

To implement this API the BatchAccumulator is changed to also support this access pattern. This is done by adding a drainOffset to the implementation. The batch accumulator is allowed to return any record and batch that is less than the drain offset.

Lastly, this change also removes some functionality that is no longer needed like non-atomic appends and validation of the base offset.

Reviewers: Colin Patrick McCabe <cmccabe@apache.org>, David Arthur <mumrah@gmail.com>
2024-08-14 15:42:04 -04:00
Ken Huang 9a85705b56
KAFKA-17297 stabalize testHandleEndQuorumRequest (#16855)
In old code leaderId and oldLeaderId all use randomReplicaId(), thus it will be flaky when leaderId is equals oldLeaderId. When KafkaRaftClient initialize it will find leaderId and oldLeaderId, so it will tansfer to candidate, and the leaderEpoch will be + 1

Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-08-12 07:08:47 +08:00
Alyssa Huang b4d5f163a9
KAFKA-17067; Fix KRaft transition to CandidateState (#16820)
Only voters should be able to transition to Candidate state. This removes VotedState as one of the EpochStates and moves voted information into UnattachedState.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2024-08-10 07:43:16 -04:00
José Armando García Sancio 8ce514a52e
KAFKA-16534; Implemeent update voter sending (#16837)
This change implements the KRaft voter sending UpdateVoter request. The
UpdateVoter RPC is used to update a voter's listeners and supported
kraft versions. The UpdateVoter RPC is sent if the replicated voter set
(VotersRecord in the log) doesn't match the local voter's supported
kraft versions and controller listeners.

To not starve the Fetch request, the UpdateVoter request is sent at most
every 3 fetch timeouts. This is required to make sure that replication
is making progress and eventually the voter set in the replicated log
matches the local voter configuration.

This change also modifies the semantic for UpdateVoter. Now the
UpdateVoter response is sent right after the leader has created the new
voter set. This is required so that updating voter can transition from
sending UpdateVoter request to sending Fetch request. If the leader
waits for the VotersRecord control record to commit before sending the
UpdateVoter response, it may never send the UpdateVoter response. This
can happen if the leader needs that voter's Fetch request to commit the
control record.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2024-08-08 16:16:09 -07:00
Colin Patrick McCabe 6a44fb154d
KAFKA-16523; kafka-metadata-quorum: support add-controller and remove-controller (#16774)
This PR adds support for add-controller and remove-controller in the kafka-metadata-quorum.sh
command-line tool. It also fixes some minor server-side bugs that blocked the tool from working.

In kafka-metadata-quorum.sh, the implementation of remove-controller is fairly straightforward. It
just takes some command-line flags and uses them to invoke AdminClient. The add-controller
implementation is a bit more complex because we have to look at the new controller's configuration
file. The parsing logic for the advertised.listeners and listeners server configurations that we
need was previously implemented in the :core module. However, the gradle module where
kafka-metadata-quorum.sh lives, :tools, cannot depend on :core. Therefore, I moved listener parsing
into SocketServerConfigs.listenerListToEndPoints. This will be a small step forward in our efforts
to move Kafka configuration out of :core.

I also made some minor changes in kafka-metadata-quorum.sh and Kafka-storage-tool.sh to handle
--help without displaying a backtrace on the screen, and give slightly better error messages on
stderr. Also, in DynamicVoter.toString, we now enclose the host in brackets if it contains a colon
(as IPV6 addresses can).

This PR fixes our handling of clusterId in addRaftVoter and removeRaftVoter, in two ways. Firstly,
it marks clusterId as nullable in the AddRaftVoterRequest.json and RemoveRaftVoterRequest.json
schemas, as it was always intended to be. Secondly, it allows AdminClient to optionally send
clusterId, by using AddRaftVoterOptions and RemoveRaftVoterOptions. We now also remember to
properly set timeoutMs in AddRaftVoterRequest. This PR adds unit tests for
KafkaAdminClient#addRaftVoter and KafkaAdminClient#removeRaftVoter, to make sure they are sending
the right things.

Finally, I fixed some minor server-side bugs that were blocking the handling of these RPCs.
Firstly, ApiKeys.ADD_RAFT_VOTER and ApiKeys.REMOVE_RAFT_VOTER are now marked as forwardable so that
forwarding from the broker to the active controller works correctly. Secondly,
org.apache.kafka.raft.KafkaNetworkChannel has now been updated to enable API_VERSIONS_REQUEST and
API_VERSIONS_RESPONSE.

Co-authored-by: Murali Basani muralidhar.basani@aiven.io
Reviewers: José Armando García Sancio <jsancio@apache.org>, Alyssa Huang <ahuang@confluent.io>
2024-08-08 15:54:12 -07:00
Dmitry Werner c9d415f361
KAFKA-17275: Move ReplicatedCounter to test scope (#16814)
Reviewers: Mickael Maison <mickael.maison@gmail.com>
2024-08-08 15:56:21 +02:00
José Armando García Sancio e49c8df1f7 KAFKA-16533; Update voter handling
Add support for handling the update voter RPC. The update voter RPC is used to automatically update
the voters supported kraft versions and available endpoints as the operator upgrades and
reconfigures the KRaft controllers.

The add voter RPC is handled as follow:

1. Check that the leader has fenced the previous leader(s) by checking that the HWM is known;
   otherwise, return the REQUEST_TIMED_OUT error.

2. Check that the cluster supports kraft.version 1; otherwise, return the UNSUPPORTED_VERSION error.

3. Check that there are no uncommitted voter changes, otherwise return the REQUEST_TIMED_OUT error.

4. Check that the updated voter still supports the currently finalized kraft.version; otherwise
   return the INVALID_REQUEST error.

5. Check that the updated voter is still listening on the default listener.

6. Append the updated VotersRecord to the log. The KRaft internal listener will read this
   uncommitted record from the log and update the voter in the set of voters.

7. Wait for the VotersRecord to commit using the majority of the voters. Return a REQUEST_TIMED_OUT
   error if it doesn't commit in time.

8. Send the UpdateVoter successful response to the voter.

This change also implements the ability for the leader to update its own entry in the voter
set when it becomes leader for an epoch. This is done by updating the voter set and writing a
control batch as the first batch in a new leader epoch.

Finally, fix a bug in KafkaAdminClient's handling of removeRaftVoterResponse where we tried to cast
the response to the wrong type.

Reviewers: Alyssa Huang <ahuang@confluent.io>, Colin P. McCabe <cmccabe@apache.org>
2024-08-05 11:32:21 -07:00
Colin Patrick McCabe 02f541d4ea
KAFKA-16518: Implement KIP-853 flags for storage-tool.sh (#16669)
As part of KIP-853, storage-tool.sh now has two new flags: --standalone, and --initial-voters. This PR implements these two flags in storage-tool.sh.

There are currently two valid ways to format a cluster:

The pre-KIP-853 way, where you use a statically configured controller quorum. In this case, neither --standalone nor --initial-voters may be specified, and kraft.version must be set to 0.

The KIP-853 way, where one of --standalone and --initial-voters must be specified with the initial value of the dynamic controller quorum. In this case, kraft.version must be set to 1.

This PR moves the formatting logic out of StorageTool.scala and into Formatter.java. The tool file was never intended to get so huge, or to implement complex logic like generating metadata records. Those things should be done by code in the metadata or raft gradle modules. This is also useful for junit tests, which often need to do formatting. (The 'info' and 'random-uuid' commands remain in StorageTool.scala, for now.)

Reviewers: José Armando García Sancio <jsancio@apache.org>
2024-08-02 15:47:45 -07:00
Alyssa Huang 2cf87bff9b
KAFKA-16953; Properly implement the sending of DescribeQuorumResponse (#16637)
This change allows the KRaft leader to send the DescribeQuorumResponse version based on the schema version used by the client.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2024-07-29 14:36:17 -04:00
José Armando García Sancio da32dcab2c
KAKFA-16537; Implement remove voter RPC (#16670)
Implement the RemoveVoter RPC. The general algorithm is as follow:

1. Check that the leader has fenced the previous leader(s) by checking that the HWM is known;
  otherwise return the REQUEST_TIMED_OUT error.
2. Check that the cluster supports kraft.version 1; otherwise return the UNSUPPORTED_VERSION error.
3. Check that there are no uncommitted voter changes; otherwise return the REQUEST_TIMED_OUT error.
4. Append the updated VotersRecord to the log. The KRaft internal listener will read this uncommitted
  record from the log and add the new voter to the set of voters.
5. Wait for the VotersRecord to commit using the majority of the new set of voters. Return a 
  REQUEST_TIMED_OUT error if it doesn't commit in time.
6. Send the RemoveVoter successful response to the client.
7. Resign the leadership if the leader is not in the new voter set

One thing to note is that now that KRaft supports both the remove voter and add voter RPC. Only one
change can be pending at once. This is achieved in the following ways. The AddVoter RPC checks if
there are pending AddVoter or RemoveVoter RPC. The RemoveVoter RPC checks if there are any
pending AddVoter or RemoveVoter RPC. Both RPCs check that there is no uncommitted VotersRecord.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2024-07-26 16:25:41 -07:00
José Armando García Sancio 9db5c2481f KAFKA-16535; Implement KRaft add voter handling
This change implements the AddVoter RPC. The high-level algorithm is as follow:

1. Check that the leader has fenced the previous leader(s) by checking that the HWM is known;
   otherwise, return the REQUEST_TIMED_OUT error.
2. Check that the cluster supports kraft.version 1; otherwise return the UNSUPPORTED_VERSION error.
3. Check that there are no uncommitted voter changes; otherwise return the REQUEST_TIMED_OUT error.
4. Check that the new voter's id is not part of the existing voter set, otherwise return the
   DUPLICATE_VOTER error.
5. Send an API_VERSIONS RPC to the first (default) listener to discover the supported kraft.version
   of the new voter.
6. Check that the new voter supports the current kraft.version, otherwise return the
   INVALID_REQUEST error.
7. Check that the new voter is caught up to the log end offset of the leader, otherwise return a
   REQUEST_TIMED_OUT error.
8. Append the updated VotersRecord to the log. The KRaft internal listener will read this
   uncommitted record from the log and add the new voter to the set of voters.
9.  Wait for the VotersRecord to commit using the majority of the new set of voters. Return a
    REQUEST_TIMED_OUT error if it doesn't commit in time.
10. Send the AddVoter successful response to the client.

The leader implements the above algorithm by tracking 3 events: the ADD_VOTER request is received,
the API_VERSIONS response is received and finally the HWM is updated.

The state of the ADD_VOTER operation is tracked by LeaderState using the AddVoterHandlerState. The
algorithm is implemented by the AddVoterHandler type.

This change also fixes a small issue introduced by the bootstrap checkpoint (0-0.checkpoint). The
internal partition listener (KRaftControlRecordStateMachine) and the external partition listener
(KafkaRaftClient.ListenerContext) were using "nextOffset = 0" as the initial state of the reading
cursor. This was causing the bootstrap checkpoint to keep getting reloaded until the leader wrote a
record to the log. Changing the initial offset (nextOffset) to -1 allows the listeners to
distinguish between the initial state (nextOffset == -1) and the bootstrap checkpoint was loaded
but the log segment is empty (nextOffset == 0).

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2024-07-22 13:07:38 -07:00
Volk 43fdc6ae08
KAFKA-17122 Change the type of `clusterId` from `UUID` to `String` (#16590)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-07-19 10:37:58 +08:00
Mickael Maison 7e3dde99d7
MINOR: Various cleanups in raft (#16611)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-07-18 18:48:29 +08:00
Colin Patrick McCabe 4d3e366bc2
KAFKA-16772: Introduce kraft.version to support KIP-853 (#16230)
Introduce the KRaftVersion enum to describe the current value of kraft.version. Change a bunch of places in the code that were using raw shorts over to using this new enum.

In BrokerServer.scala, fix a bug that could cause null pointer exceptions during shutdown if we tried to shut down before fully coming up.

Do not send finalized features that are finalized as level 0, since it is a no-op.

Reviewers: dengziming <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@apache.org>
2024-07-16 09:31:10 -07:00
Mason Chen e1bf155270
KAFKA-17055; Use random replica ids in kraft protocol tests
All of the tests in KafkakRaftClientTest and KafkaRaftClientSnapshotTest use well known ids like 0, 1, etc. Because of this those tests were not able to catch a bug in the BeginQuorumEpoch schema were the default value for VoterId was 0 instead of -1.

Improve those tests by using random valid replica id to lower the probability that the replica id will match the default value of the schema.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2024-07-16 11:33:32 -04:00
Volk 15eb555b03
MINOR: Fix the typo in RaftEventSimulationTest.java and ControllerNode.java (#16591)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-07-15 18:15:22 +08:00
Alyssa Huang 7495e70365
KAFKA-16532; Support for first leader bootstrapping the voter set (#16518)
The first leader of a KRaft topic partition must rewrite the content of the bootstrap checkpoint (0-0.checkpoint) to the log so that it is replicated. Bootstrap checkpoints are not replicated to the followers.

The control records for KRaftVersionRecord and VotersRecord in the bootstrap checkpoint will be written in one batch along with the LeaderChangeMessage. The leader will write these control records before accepting data records from the state machine (Controller).

The leader determines that the bootstrap checkpoint has not been written to the log if the latest set of voters is located at offset -1. This is the last contained offset for the bootstrap checkpoint (0-0.checkpoint).

This change also improves the RaftClientTestContext to allow for better testing of the reconfiguration functionality. This is mainly done by allowing the voter set to be configured statically or through the bootstrap checkpoint.

Reviewers: José Armando García Sancio <jsancio@apache.org>, Colin P. McCabe <cmccabe@apache.org>
Co-authors: José Armando García Sancio <jsancio@apache.org>
2024-07-12 13:44:21 -07:00
José Armando García Sancio 376365d9da
MINOR; Add property methods to LogOffsetMetadata (#16521)
This change simply adds property methods to LogOffsetMetadata. It
changes all of the callers to use the new property methods instead of
using the fields directly.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2024-07-04 15:03:32 -04:00
José Armando García Sancio 9f7afafefe
KAFKA-16529; Implement raft response handling (#16454)
This change implements response handling for the new version of Vote, Fetch, FetchSnapshot, BeginQuorumEpoch and EndQuorumEpoch. All of these responses were extended to include the leader's endpoint when the leader is known.

This change also includes sending the new version of the requests for Vote, Fetch, FetchSnapshot, BeginQuorumEpoch and EndQuorumEpoch. The two most notable changes are that:
1.  The leader is going to include all of its endpoints in the BeginQuorumEpoch and EndQuorumEpoch requests.
2. The replica is going to include the destination replica key for the Vote and BeginQuorumEpoch request.

QuorumState was extended so that the replica transitions to UnattachedState instead of FollowerState during startup, if the leader is known but the leader's endpoint is not known. This can happen if the known leader is not part of the voter set replicated by the replica. The expectation is that the replica will rediscover the leader from Fetch responses from the bootstrap servers or from the BeginQuorumEpoch request from the leader.

To make sure that replicas never forget the leader of a given epoch the unattached state was extended to allow an optional leader id for when the leader is known but the leader's endpoint is not known.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2024-07-03 16:52:35 -04:00
Alyssa Huang b0054f3a2f
KAFKA-16536; Use BeginQuorumEpoch as leader heartbeat (#16399)
With KIP-853, the leader's endpoint is sent to the other voters using the BeginQuorumEpoch RPC. The remote replicas never store the leader's endpoint. That means that leaders need to resend the leader's endpoint if a voter restarts.

This change accomplishes this by sending the BeginQuorumEpoch as a heartbeat. The period is sent to the half the fetch timeout to prevent voters from transitioning to the candidate state when restarting.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2024-06-28 10:27:30 -04:00
José Armando García Sancio 9be27e715a
MINOR; Fix incompatible change to the kafka config (#16464)
Prior to KIP-853, users were not allow to enumerate listeners specified in `controller.listener.names` in the `advertised.listeners`. This decision was made in 3.3 because the `controller.quorum.voters` property is in effect the list of advertised listeners for all of the controllers.

KIP-853 is moving away from `controller.quorum.voters` in favor of a dynamic set of voters. This means that the user needs to have a way of specifying the advertised listeners for controller.

This change allows the users to specify listener names in `controller.listener.names` in `advertised.listeners`. To make this change forwards compatible (use a valid configuration from 3.8 in 3.9), the controller's advertised listeners are going to get computed by looking up the endpoint in `advertised.listeners`. If it doesn't exist, the controller will look up the endpoint in the `listeners` configuration.

This change also includes a fix the to the BeginQuorumEpoch request where the default value for VoterId was 0 instead of -1.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2024-06-27 21:24:25 -04:00
José Armando García Sancio adee6f0cc1
KAFKA-16527; Implement request handling for updated KRaft RPCs (#16235)
Implement request handling for the updated versions of the KRaft RPCs (Fetch, FetchSnapshot, Vote,
BeginQuorumEpoch and EndQuorumEpoch). This doesn't add support for KRaft replicas to send the new
version of the KRaft RPCs. That will be implemented in KAFKA-16529.

All of the RPCs responses were extended to include the leader's endpoint for the listener of the
channel used in the request. EpochState was extended to include the leader's endpoint information
but only the FollowerState and LeaderState know the leader id and its endpoint(s).

For the Fetch request, the replica directory id was added. The leader now tracks the follower's log
end offset using both the replica id and replica directory id.

For the FetchSnapshot request, the replica directory id was added. This is not used by the KRaft
leader and it is there for consistency with Fetch and for help debugging.

For the Vote request, the replica key for both the voter (destination) and the candidate (source)
were added. The voter key is checked for consistency. The candidate key is persisted when the vote
is granted.

For the BeginQuorumEpoch request, all of the leader's endpoints are included. This is needed so
that the voters can return the leader's endpoint for all of the supported listeners.

For the EndQuorumEpoch request, all of the leader's endpoints are included. This is needed so that
the voters can return the leader's endpoint for all of the supported listeners. The successor list
has been extended to include the directory id. Receiving voters can use the entire replica key when
searching their position in the successor list.

Updated the existing test in KafkaRaftClientTest and KafkaRaftClientSnapshotTest to execute using
both the old version and new version of the RPCs.

Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2024-06-25 13:45:15 -07:00
dujian0068 78372b6883
KAFKA-17013 RequestManager#ConnectionState#toString() should use %s (#16413)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-06-24 23:44:39 +08:00
Luke Chen d646a09dd0
KAFKA-16531: calculate check-quorum when leader is not in voter set (#16211)
In the check-quorum calculation, the leader should not assume that it is part of the voter set. This may happen when the leader is removing itself from the voter set. This PR improves it by checking if leader is in the voter set or not, and decide how many follower fetches required. Also add tests.

Co-authored-by: Colin P. McCabe <cmccabe@apache.org>

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, José Armando García Sancio <jsancio@apache.org>
2024-06-21 11:22:24 +08:00
PoAn Yang 9ac102596b
KAFKA-16971 Fix the incorrect format string in QuorumConfigs#parseBootstrapServer (#16358)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-06-17 07:55:47 +08:00
gongxuanzhang d239dde8f6
KAFKA-10787 Apply spotless to raft module (#16278)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-06-15 11:28:36 +08:00
Omnia Ibrahim e99da2446c
KAFKA-15853: Move KafkaConfig.configDef out of core (#16116)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-06-14 17:26:00 +02:00
gongxuanzhang 596b945072
KAFKA-16643 Add ModifierOrder checkstyle rule (#15890)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-06-13 15:39:32 +08:00
Nikolay aecaf44475
KAFKA-16520: Support KIP-853 in DescribeQuorum (#16106)
Add support for KIP-953 KRaft Quorum reconfiguration in the DescribeQuorum request and response.
Also add support to AdminClient.describeQuorum, so that users will be able to find the current set of
quorum nodes, as well as their directories, via these RPCs.

Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Andrew Schofield <aschofield@confluent.io>
2024-06-11 10:01:35 -07:00
Alyssa Huang f880ad6ccf
KAFKA-16530: Fix high-watermark calculation to not assume the leader is in the voter set (#16079)
1. Changing log message from error to info - We may expect the HW calculation to give us a smaller result than the current HW in the case of quorum reconfiguration. We will continue to not allow the HW to actually decrease.
2. Logic for finding the updated LeaderEndOffset for updateReplicaState is changed as well. We do not assume the leader is in the voter set and check the observer states as well.
3. updateLocalState now accepts an additional "lastVoterSet" param which allows us to update the leader state with the last known voters. any nodes in this set but not in voterStates will be added to voterStates and removed from observerStates, any nodes not in this set but in voterStates will be removed from voterStates and added to observerStates

Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@apache.org>
2024-06-06 14:30:49 +08:00
Sanskar Jhajharia 896af1b2f2
MINOR: Raft module Cleanup (#16205)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-06-06 04:16:59 +08:00
José Armando García Sancio 459da4795a
KAFKA-16525; Dynamic KRaft network manager and channel (#15986)
Allow KRaft replicas to send requests to any node (Node) not just the nodes configured in the
controller.quorum.voters property. This flexibility is needed so KRaft can implement the
controller.quorum.voters configuration, send request to the dynamically changing set of voters and
send request to the leader endpoint (Node) discovered through the KRaft RPCs (specially
BeginQuorumEpoch request and Fetch response).

This was achieved by changing the RequestManager API to accept Node instead of just the replica ID.
Internally, the request manager tracks connection state using the Node.idString method to match the
connection management used by NetworkClient.

The API for RequestManager is also changed so that the ConnectState class is not exposed in the
API. This allows the request manager to reclaim heap memory for any connection that is ready.

The NetworkChannel was updated to receive the endpoint information (Node) through the outbound raft
request (RaftRequent.Outbound). This makes the network channel more flexible as it doesn't need to
be configured with the list of all possible endpoints. RaftRequest.Outbound and
RaftResponse.Inbound were updated to include the remote node instead of just the remote id.

The follower state tracked by KRaft replicas was updated to include both the leader id and the
leader's endpoint (Node). In this comment the node value is computed from the set of voters. In
future commit this will be updated so that it is sent through KRaft RPCs. For example
BeginQuorumEpoch request and Fetch response.

Support for configuring controller.quorum.bootstrap.servers was added. This includes changes to
KafkaConfig, QuorumConfig, etc. All of the tests using QuorumTestHarness were changed to use the
controller.quorum.bootstrap.servers instead of the controller.quorum.voters for the broker
configuration. Finally, the node id for the bootstrap server will be decreasing negative numbers
starting with -2.

Reviewers: Jason Gustafson <jason@confluent.io>, Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2024-06-03 14:24:48 -07:00
dengziming 131ce0ba59
Minor: Fix VoterSetHistoryTest.testAddAt (#16104)
Reviewers: Luke Chen <showuon@gmail.com>
2024-05-30 10:28:07 +08:00
Colin P. McCabe 90892ae99f KAFKA-16516: Fix the controller node provider for broker to control channel
Fix the code in the RaftControllerNodeProvider to query RaftManager to find Node information,
rather than consulting a static map. Add a RaftManager.voterNode function to supply this
information. In KRaftClusterTest, add testControllerFailover to get more coverage of controller
failovers.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2024-05-24 09:52:47 -07:00
Mickael Maison affe8da54c
KAFKA-7632: Support Compression Levels (KIP-390) (#15516)
Reviewers: Jun Rao <jun@confluent.io>,  Luke Chen <showuon@gmail.com>
Co-authored-by: Lee Dongjin <dongjin@apache.org>
2024-05-21 17:58:49 +02:00
José Armando García Sancio 056d232f4e
KAFKA-16526; Quorum state data version 1 (#15859)
Allow KRaft replicas to read and write version 0 and 1 of the quorum-state file. Which version is written is controlled by the kraft.version. With kraft.version 0, version 0 of the quorum-state file is written. With kraft.version 1, version 1 of the quorum-state file is written. Version 1 of the quorum-state file adds the VotedDirectoryId field and removes the CurrentVoters. The other fields removed in version 1 are not important as they were not overwritten or used by KRaft.

In kraft.version 1 the set of voters will be stored in the kraft partition log segments and snapshots.

To implement this feature the following changes were made to KRaft.

FileBasedStateStore was renamed to FileQuorumStateStore to better match the name of the implemented interface QuorumStateStore.

The QuorumStateStore::writeElectionState was extended to include the kraft.version. This version is used to determine which version of QuorumStateData to store. When writing version 0 the VotedDirectoryId is not persisted but the latest value is kept in-memory. This allows the replica to vote consistently while they stay online. If a replica restarts in the middle of an election it will forget the VotedDirectoryId if the kraft.version is 0. This should be rare in practice and should only happen if there is an election and failure while the system is upgrading to kraft.version 1.

The type ElectionState, the interface EpochState and all of the implementations of EpochState (VotedState, UnattachedState, FollowerState, ResignedState, CandidateState and LeaderState) are extended to support the new voted directory id.

The type QuorumState is changed so that local directory id is used. The type is also changed so that the latest value for the set of voters and the kraft version is query from the KRaftControlRecordStateMachine.

The replica directory id is read from the meta.properties and passed to the KafkaRaftClient. The replica directory id is guaranteed to be set in the local replica.

Adds a new metric for current-vote-directory-id which exposes the latest in-memory value of the voted directory id.

Renames VoterSet.VoterKey to ReplicaKey.

It is important to note that after this change, version 1 of the quorum-state file will not be written by kraft controllers and brokers. This change adds support reading and writing version 1 of the file in preparation for future changes.

Reviewers: Jun Rao <junrao@apache.org>
2024-05-16 09:53:36 -04:00
José Armando García Sancio 440f5f6c09
MINOR; Validate at least one control record (#15912)
Validate that a control batch in the batch accumulator has at least one control record.

Reviewers: Jun Rao <junrao@apache.org>, Chia-Ping Tsai <chia7712@apache.org>
2024-05-14 10:02:29 -04:00
José Armando García Sancio bfe81d6229
KAFKA-16207; KRaft's internal log listener to update voter set (#15671)
Adds support for the KafkaRaftClient to read the control records KRaftVersionRecord and VotersRecord in the snapshot and log. As the control records in the KRaft partition are read, the replica's known set of voters are updated. This change also contains the necessary changes to include the control records when a snapshot is generated by the KRaft state machine.

It is important to note that this commit changes the code and the in-memory state to track the sets of voters but it doesn't change any data that is externally exposed. It doesn't change the RPCs, data stored on disk or configuration.

When the KRaft replica starts the PartitionListener reads the latest snapshot and then log segments up to the LEO, updating the in-memory state as it reads KRaftVersionRecord and VotersRecord. When the replica (leader and follower) appends to the log, the PartitionListener catches up to the new LEO. When the replica truncates the log because of a diverging epoch, the PartitionListener also truncates the in-memory state to the new LEO. When the state machine generate a new snapshot the PartitionListener trims any prefix entries that are not needed. This is all done to minimize the amount of data tracked in-memory and to make sure that it matches the state on disk.

To implement the functionality described above this commit also makes the following changes:

Adds control records for KRaftVersionRecord and VotersRecord. KRaftVersionRecord describes the finalized kraft.version supported by all of the replicas. VotersRecords describes the set of voters at a specific offset.

Changes Kafka's feature version to support 0 as the smallest valid value. This is needed because the default value for kraft.version is 0.

Refactors FileRawSnapshotWriter so that it doesn't directly call the onSnapshotFrozen callback. It adds NotifyingRawSnapshotWriter for calling such callbacks. This reorganization is needed because in this change both the KafkaMetadataLog and the KafkaRaftClient need to react to snapshots getting frozen.

Cleans up KafkaRaftClient's initialization. Removes initialize from RaftClient - this is an implementation detail that doesn't need to be exposed in the interface. Removes RaftConfig.AddressSpec and simplifies the bootstrapping of the static voter's address. The bootstrapping of the address is delayed because of tests. We should be able to simplify this further in future commits.

Update the DumpLogSegment CLI to support the new control records KRaftVersionRecord and VotersRecord.

Fix the RecordsSnapshotReader implementations so that the iterator includes control records. RecordsIterator is extended to support reading the new control records.
Improve the BatchAccumulator implementation to allow multiple control records in one control batch. This is needed so that KRaft can make sure that VotersRecord is included in the same batch as the control record (KRaftVersionRecord) that upgrades the kraft.version to 1.

Add a History interface and default implementation TreeMapHistory. This is used to track all of the sets of voters between the latest snapshot and the LEO. This is needed so that KafkaRaftClient can query for the latest set of voters and so that KafkaRaftClient can include the correct set of voters when the state machine generates a new snapshot at a given offset.

Add a builder pattern for RecordsSnapshotWriter. The new builder pattern also implements including the KRaftVersionRecord and VotersRecord control records in the snapshot as necessary. A KRaftVersionRecord should be appended if the kraft.version is greater than 0 at the snapshot's offset. Similarly, a VotersRecord should be appended to the snapshot with the latest value up to the snapshot's offset.

Reviewers: Jason Gustafson <jason@confluent.io>
2024-05-04 12:43:16 -07:00
Mickael Maison e7792258df
MINOR: Various cleanups in raft (#15805)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-04-26 15:20:09 +02:00
Omnia Ibrahim 6feae817d2
MINOR: Rename RaftConfig to QuorumConfig (#15797)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-04-26 03:08:31 +08:00
Kuan-Po (Cooper) Tseng 12a1d85362
KAFKA-12187 replace assertTrue(obj instanceof X) with assertInstanceOf (#15512)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-03-20 10:36:25 +08:00
José Armando García Sancio 474f8c1ad6
KAFKA-16286; Notify listener of latest leader and epoch (#15397)
KRaft was only notifying listeners of the latest leader and epoch when the replica transition to a new state. This can result in the listener never getting notified if the registration happened after it had become a follower.

This problem doesn't exists for the active leader because the KRaft implementation attempts to notified the listener of the latest leader and epoch when the replica is the active leader.

This issue is fixed by notifying the listeners of the latest leader and epoch after processing the listener registration request.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2024-02-23 12:56:25 -08:00
yicheny af0aceb4d7
MINOR: fix word spelling mistakes (#15331)
fix word spelling mistakes

Reviewers: Luke Chen <showuon@gmail.com>
2024-02-07 15:30:01 +08:00
Luke Chen 70bd4ce8a7
KAFKA-16144: skip checkQuorum for only 1 voter case (#15235)
When there's only 1 voter, there will be no fetch request from other voters. In this case, we should still not expire the checkQuorum timer because there's just 1 voter.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Federico Valeri <fedevaleri@gmail.com>, José Armando García Sancio <jsancio@apache.org>
2024-01-23 10:17:53 +08:00