Commit Graph

519 Commits

Author SHA1 Message Date
TengYao Chi 8438c4339e
KAFKA-17245: Revert TopicRecord changes. (#16780)
Revert KAFKA-16257 changes because KIP-950 doesn't need it anymore.

Reviewers: Luke Chen <showuon@gmail.com>
2024-08-03 20:15:51 +08:00
Colin Patrick McCabe 02f541d4ea
KAFKA-16518: Implement KIP-853 flags for storage-tool.sh (#16669)
As part of KIP-853, storage-tool.sh now has two new flags: --standalone, and --initial-voters. This PR implements these two flags in storage-tool.sh.

There are currently two valid ways to format a cluster:

The pre-KIP-853 way, where you use a statically configured controller quorum. In this case, neither --standalone nor --initial-voters may be specified, and kraft.version must be set to 0.

The KIP-853 way, where one of --standalone and --initial-voters must be specified with the initial value of the dynamic controller quorum. In this case, kraft.version must be set to 1.

This PR moves the formatting logic out of StorageTool.scala and into Formatter.java. The tool file was never intended to get so huge, or to implement complex logic like generating metadata records. Those things should be done by code in the metadata or raft gradle modules. This is also useful for junit tests, which often need to do formatting. (The 'info' and 'random-uuid' commands remain in StorageTool.scala, for now.)

Reviewers: José Armando García Sancio <jsancio@apache.org>
2024-08-02 15:47:45 -07:00
Chung, Ming-Yen 7c0a96d08d
KAFKA-17185 Declare Loggers as static to prevent multiple logger instances (#16680)
As discussed in #16657 (comment) , we should make logger as static to avoid creating multiple logger instances.
I use the regex private.*Logger.*LoggerFactory to search and check all the results if certain logs need to be static.

There are some exceptions that loggers don't need to be static:
1) The logger in the inner class. Since java8 doesn't support static field in the inner class.
        https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java#L3676

2) Custom loggers for each instance (non-static + non-final). In this case, multiple logger instances is actually really needed.
        https://github.com/apache/kafka/blob/trunk/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java#L166

3) The logger is initialized in constructor by LogContext. Many non-static but with final modifier loggers are in this category, that's why I use .*LoggerFactory to only check the loggers that are assigned initial value when declaration.
    
4) protected final Logger log = Logger.getLogger(getClass())
    This is for subclass can do logging with subclass name instead of superclass name.
    But in this case, if the log access modifier is private, the purpose cannot be achieved since subclass cannot access the log defined in superclass. So if access modifier is private, we can replace getClass() with <className>.class

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-07-31 02:37:36 +08:00
Luke Chen 1b11fef5bb
KAFKA-17205: Allow topic config validation in controller level in KRaft mode (#16693)
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Christo Lolov <lolovc@amazon.com>
2024-07-30 17:07:09 +01:00
Justine Olshan a0f6e6f816
KAFKA-16192: Introduce transaction.version and usage of flexible records to coordinators (#16183)
This change includes adding transaction.version (part of KIP-1022)

New transaction version 1 is introduced to support writing flexible fields in transaction state log messages.

Transaction version 2 is created in anticipation for further KIP-890 changes.

Neither are made production ready. Tests for the new transaction version and new MV are created.

Also include change to not report a feature as supported if the range is 0-0.

Reviewers: Jun Rao <junrao@apache.org>, David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Colin P. McCabe <cmccabe@apache.org>
2024-07-26 11:38:44 -07:00
Logan Zhu 3589f45656
MINOR: Replace lambda expressions with method references for ReplicationControlManager (#16547)
Reviewers: Xuan-Zhang Gong <gongxuanzhangmelt@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-07-24 19:44:54 +08:00
Mickael Maison 90b779b7bb
MINOR: Various cleanups in metadata (#16610)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-07-22 10:26:09 +08:00
Chung, Ming-Yen 66655ab49a
KAFKA-17095 Fix the typo from "CreateableTopicConfig" to "CreatableTopicConfig" (#16623)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-07-19 11:09:08 +08:00
Colin Patrick McCabe 4d3e366bc2
KAFKA-16772: Introduce kraft.version to support KIP-853 (#16230)
Introduce the KRaftVersion enum to describe the current value of kraft.version. Change a bunch of places in the code that were using raw shorts over to using this new enum.

In BrokerServer.scala, fix a bug that could cause null pointer exceptions during shutdown if we tried to shut down before fully coming up.

Do not send finalized features that are finalized as level 0, since it is a no-op.

Reviewers: dengziming <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@apache.org>
2024-07-16 09:31:10 -07:00
David Arthur 8aee314a46
KAFKA-16667 Avoid stale read in KRaftMigrationDriver (#15918)
When becoming the active KRaftMigrationDriver, there is another race condition similar to KAFKA-16171. This time, the race is due to a stale read from ZK. After writing to /controller and /controller_epoch, it is possible that a read on /migration is not linearized with the writes that were just made. In other words, we get a stale read on /migration. This leads to an inability to sync metadata to ZK due to incorrect zkVersion on the migration ZNode.

The non-linearizability of reads is in fact documented behavior for ZK, so we need to handle it.

To fix the stale read, this patch adds a write to /migration after updating /controller and /controller_epoch. This allows us to learn the correct zkVersion for the migration ZNode before leaving the BECOME_CONTROLLER state.

This patch also adds a check on the current leader epoch when running certain events in KRaftMigrationDriver. Historically, we did not include this check because it is not necessary for correctness. Writes to ZK are gated on the /controller_epoch zkVersion, and RPCs sent to brokers are gated on the controller epoch. However, during a time of rapid failover, there is a lot of processing happening on the controller (i.e., full metadata sync to ZK and full UMRs sent to brokers), so it is best to avoid running events we know will fail.

There is also a small fix in here to improve the logging of ZK operations. The log message are changed to past tense to reflect the fact that they have already happened by the time the log message is created.

Reviewers: Igor Soarez <soarez@apple.com>
2024-07-15 09:32:06 -04:00
Colin Patrick McCabe ebaa108967
KAFKA-16968: Introduce 3.8-IV0, 3.9-IV0, 3.9-IV1
Create 3 new metadata versions:

- 3.8-IV0, for the upcoming 3.8 release.
- 3.9-IV0, to add support for KIP-1005.
- 3.9-IV1, as the new release vehicle for KIP-966.

Create ListOffsetRequest v9, which will be used in 3.9-IV0 to support KIP-1005. v9 is currently an unstable API version.

Reviewers: Jun Rao <junrao@gmail.com>, Justine Olshan <jolshan@confluent.io>
2024-06-27 14:03:03 -07:00
Murali Basani 87f8147ed0
KAFKA-16855 : Part 1 - New fields tieredEpoch and tieredState (#16257)
Add field tieredEpoch to RemoteLogSegmentMetadata
Update relevant tests
Add two fields tieredEpoch and tieredState to TopicRecord.json

Reviewers: Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>
2024-06-25 15:00:12 +01:00
TingIāu "Ting" Kì b2758f4ac6
KAFKA-16989 Use StringBuilder instead of String concatenation (#16385)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-06-19 09:19:32 +08:00
Colin Patrick McCabe 2fd00ce536
KAFKA-16952: Do not bump broker epoch when re-registering the same incarnation (#16333)
* KAFKA-16952: Do not bump broker epoch when re-registering the same incarnation

As part of KIP-858 (Handle JBOD broker disk failure in KRaft), we added some code that caused the
broker to re-register itself when transitioning from a MetadataVersion that did not support broker
directory IDs, to one that did. This code was necessary because otherwise the controller would not
be aware of what directories the broker held.

However, prior to this PR, the re-registration process acted exactly like a full registration. That
is, it bumped the broker epoch (which is meant to only be bumped on broker restart). This PR fixes
the code to keep the broker epoch the same if the incarnation ID is the same.

There are some other minor improvements here:

- The previous logic relied on a complicated combination of request version and previous broker
  epoch to understand if the request came from the same broker or not. This is not needed: either
  the incarnation ID is the same and it's the same process, or it is not and it isn't.

- We now log whether we're amending a registration, registering a previously unknown broker, or
  replacing a previous registration.

- Move changes to the HeartbeatManager to the end of the function, so that we will not do them if
  any validation step fails. Log4j messages are also generated at the end, for the same reason.

Reviewers: Ron Dagostino <rndgstn@gmail.com>
2024-06-18 07:03:15 -07:00
ChickenchickenLove 1a7ba667ad
MINOR improve startup log in QuorumController (#15926)
Reviewers: David Arthur <mumrah@gmail.com>
2024-06-17 11:04:12 -04:00
TingIāu "Ting" Kì 92d8d4bd1f
KAFKA-16970 Fix hash implementation of `ScramCredentialValue`, `ScramCredentialData`, and `ContextualRecord` (#16359)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-06-17 22:29:22 +08:00
gongxuanzhang 4e846038a6
KAFKA-10787 Apply spotless to `metadata` and `server` and `storage` module (#16297)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-06-16 05:28:50 +08:00
Kuan-Po (Cooper) Tseng 2e5cd0b476
MINOR: Refine javadoc in TopicsDelta TopicDelta LocalReplicaChanges (#16195)
Add more description to TopicsDelta TopicDelta LocalReplicaChanges

Reviewers: Luke Chen <showuon@gmail.com>
2024-06-14 11:22:19 +08:00
gongxuanzhang 596b945072
KAFKA-16643 Add ModifierOrder checkstyle rule (#15890)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-06-13 15:39:32 +08:00
Sanskar Jhajharia 226f3c57e3
MINOR: Code cleanup in metadata module (#16065)
Reviewers: Mickael Maison <mickael.maison@gmail.com>
2024-06-06 15:18:23 +02:00
TingIāu "Ting" Kì d652f5cf54
MINOR: Add topicIds and directoryIds to the return value of the toString method. (#16189)
Add topicIds and directoryIds to the return value of the toString method.

Reviewers: Luke Chen <showuon@gmail.com>
2024-06-05 07:52:06 +08:00
Igor Soarez 7e0caad96e
MINOR: Cleanup unused references in core (#16192)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-06-05 05:12:33 +08:00
Igor Soarez 16359e70d3
KAFKA-16583: Handle PartitionChangeRecord without directory IDs (#16118)
When PartitionRegistration#merge() reads a PartitionChangeRecord
from an older MetadataVersion, with a replica assignment change
and without #directories() set, it produces a direcotry assignment
of DirectoryId.UNASSIGNED. This is problematic because the MetadataVersion
may not yet support directory assignments, leading to a
UnwritableMetadataException in PartitionRegistration#toRecord.

Since the Controller always sets directories on PartitionChangeRecord
if the MetadataVersion supports it, via PartitionChangeBuilder,
there's no need for PartitionRegistration#merge() to populate
directories upon a replica assignment change.

Reviewers: Luke Chen <showuon@gmail.com>
2024-06-04 15:37:20 +01:00
David Jacot 53d592e369
MINOR: Fix type in MetadataVersion.IBP_4_0_IV0 (#16181)
This patch fixes a typo in MetadataVersion.IBP_4_0_IV0. It should be 0 not O.

Reviewers: Justine Olshan <jolshan@confluent.io>, Jun Rao <junrao@gmail.com>,  Chia-Ping Tsai <chia7712@gmail.com>
2024-06-03 20:48:04 -07:00
Colin Patrick McCabe 8ace33b47f
KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 (#15945)
When upgrading from a MetadataVersion older than 3.7-IV2, we need to resend the broker registration, so that the controller can record the storage directories. The current code for doing this has several problems, however. One is that it tends to trigger even in cases where we don't actually need it. Another is that when re-registering the broker, the broker is marked as fenced.

This PR moves the handling of the re-registration case out of BrokerMetadataPublisher and into BrokerRegistrationTracker. The re-registration code there will only trigger in the case where the broker sees an existing registration for itself with no directories set.  This is much more targetted than the original code.

Additionally, in ClusterControlManager, when re-registering the same broker, we now preserve its fencing and shutdown state, rather than clearing those. (There isn't any good reason re-registering the same broker should clear these things... this was purely an oversight.) Note that we can tell the broker is "the same" because it has the same IncarnationId.

Reviewers: Gaurav Narula <gaurav_narula2@apple.com>, Igor Soarez <soarez@apple.com>
2024-06-01 23:51:39 +01:00
David Jacot ba61ff0cd9
KAFKA-16860; [1/2] Introduce group.version feature flag (#16120)
This patch introduces the `group.version` feature flag with one version:
1) Version 1 enables the new consumer group rebalance protocol (KIP-848).

Reviewers: Justine Olshan <jolshan@confluent.io>
2024-05-31 12:48:55 -07:00
Justine Olshan 7c1bb1585f
KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config (#16130)
As per KIP-1022, we will rename the unstable metadata versions enabled config to support all feature versions.

Features is also updated to return latest production and latest testing versions of each feature.

A feature is production ready when the corresponding metadata version (bootstrapMetadataVersion) is production ready.

Adds tests for the feature usage of the unstableFeatureVersionsEnabled config

Reviewers: David Jacot <djacot@confluent.io>, Jun Rao <junrao@gmail.com>
2024-05-30 14:52:50 -07:00
Justine Olshan 5e3df22095
KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool (#15685)
As part of KIP-1022, I have created an interface for all the new features to be used when parsing the command line arguments, doing validations, getting default versions, etc.

I've also added the --feature flag to the storage tool to show how it will be used.

Created a TestFeatureVersion to show an implementation of the interface (besides MetadataVersion which is unique) and added tests using this new test feature.

I will add the unstable config and tests in a followup.

Reviewers: David Mao <dmao@confluent.io>, David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jun Rao <junrao@apache.org>
2024-05-29 16:36:06 -07:00
Mickael Maison affe8da54c
KAFKA-7632: Support Compression Levels (KIP-390) (#15516)
Reviewers: Jun Rao <jun@confluent.io>,  Luke Chen <showuon@gmail.com>
Co-authored-by: Lee Dongjin <dongjin@apache.org>
2024-05-21 17:58:49 +02:00
Ken Huang 81e6098021
KAFKA-16797 A bit cleanup of FeatureControlManager (#15997)
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-05-20 17:19:01 +08:00
Gaurav Narula 412b05df00
KAFKA-16789 Fix thread leak detection for event handler threads (#15984)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-05-19 18:21:56 +08:00
José Armando García Sancio bfe81d6229
KAFKA-16207; KRaft's internal log listener to update voter set (#15671)
Adds support for the KafkaRaftClient to read the control records KRaftVersionRecord and VotersRecord in the snapshot and log. As the control records in the KRaft partition are read, the replica's known set of voters are updated. This change also contains the necessary changes to include the control records when a snapshot is generated by the KRaft state machine.

It is important to note that this commit changes the code and the in-memory state to track the sets of voters but it doesn't change any data that is externally exposed. It doesn't change the RPCs, data stored on disk or configuration.

When the KRaft replica starts the PartitionListener reads the latest snapshot and then log segments up to the LEO, updating the in-memory state as it reads KRaftVersionRecord and VotersRecord. When the replica (leader and follower) appends to the log, the PartitionListener catches up to the new LEO. When the replica truncates the log because of a diverging epoch, the PartitionListener also truncates the in-memory state to the new LEO. When the state machine generate a new snapshot the PartitionListener trims any prefix entries that are not needed. This is all done to minimize the amount of data tracked in-memory and to make sure that it matches the state on disk.

To implement the functionality described above this commit also makes the following changes:

Adds control records for KRaftVersionRecord and VotersRecord. KRaftVersionRecord describes the finalized kraft.version supported by all of the replicas. VotersRecords describes the set of voters at a specific offset.

Changes Kafka's feature version to support 0 as the smallest valid value. This is needed because the default value for kraft.version is 0.

Refactors FileRawSnapshotWriter so that it doesn't directly call the onSnapshotFrozen callback. It adds NotifyingRawSnapshotWriter for calling such callbacks. This reorganization is needed because in this change both the KafkaMetadataLog and the KafkaRaftClient need to react to snapshots getting frozen.

Cleans up KafkaRaftClient's initialization. Removes initialize from RaftClient - this is an implementation detail that doesn't need to be exposed in the interface. Removes RaftConfig.AddressSpec and simplifies the bootstrapping of the static voter's address. The bootstrapping of the address is delayed because of tests. We should be able to simplify this further in future commits.

Update the DumpLogSegment CLI to support the new control records KRaftVersionRecord and VotersRecord.

Fix the RecordsSnapshotReader implementations so that the iterator includes control records. RecordsIterator is extended to support reading the new control records.
Improve the BatchAccumulator implementation to allow multiple control records in one control batch. This is needed so that KRaft can make sure that VotersRecord is included in the same batch as the control record (KRaftVersionRecord) that upgrades the kraft.version to 1.

Add a History interface and default implementation TreeMapHistory. This is used to track all of the sets of voters between the latest snapshot and the LEO. This is needed so that KafkaRaftClient can query for the latest set of voters and so that KafkaRaftClient can include the correct set of voters when the state machine generates a new snapshot at a given offset.

Add a builder pattern for RecordsSnapshotWriter. The new builder pattern also implements including the KRaftVersionRecord and VotersRecord control records in the snapshot as necessary. A KRaftVersionRecord should be appended if the kraft.version is greater than 0 at the snapshot's offset. Similarly, a VotersRecord should be appended to the snapshot with the latest value up to the snapshot's offset.

Reviewers: Jason Gustafson <jason@confluent.io>
2024-05-04 12:43:16 -07:00
Colin Patrick McCabe a3f2414990
KAFKA-16624: Don't generate useless PartitionChangeRecord on older MV (#15810)
Fix a case where we could generate useless PartitionChangeRecords on metadata versions older than
3.6-IV0. This could happen in the case where we had an ISR with only one broker in it, and we were
trying to go down to a fully empty ISR. In this case, PartitionChangeBuilder would block the record
to going down to a fully empty ISR (since that is not valid in these pre-KIP-966 metadata
versions), but it would still emit the record, even though it had no effect.

Reviewers: Igor Soarez <soarez@apple.com>
2024-05-02 09:23:25 -07:00
mannoopj 31355ef8f9
KAFKA-16475: add more tests to TopicImageNodeTest (#15735)
Add more test cases to TopicImageNodeTest.java.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2024-04-30 14:59:00 -07:00
Luke Chen ec151c8278
KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors (#15732)
When running ZK migrating to KRaft process, we encountered an issue that the migrating is hanging and the ZkMigrationState cannot move to MIGRATION state. And it is because the pollEvent didn't retry with the retriable MigrationClientException (ZK client retriable errors) while it should. This PR fixes it and add test. And because of this, the poll event will not poll anymore, which causes the KRaftMigrationDriver hanging.

Reviewers: Luke Chen <showuon@gmail.com>, Igor Soarez<soarez@apple.com>, Akhilesh C <akhileshchg@users.noreply.github.com>
2024-04-29 17:44:47 +08:00
Mickael Maison df4ef5a621
MINOR: Various cleanups in metadata (#15806)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-04-26 05:50:40 +08:00
TingIāu "Ting" Kì 864744ffd4
KAFKA-16610 Replace "Map#entrySet#forEach" by "Map#forEach" (#15795)
Reviewers: Apoorv Mittal <amittal@confluent.io>, Igor Soarez <soarez@apple.com>
2024-04-25 01:52:24 +01:00
Omnia Ibrahim cfe5ab5cf2
KAFKA-15853 Move quota configs into server-common package (#15774)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-04-24 13:05:18 +08:00
ilyazr b254e787cb
KAFKA-16466: Log exception message for non-fault errors in QuorumController (#15701)
The generic error handler of QuorumController didn't log the exception message for non-fault errors, which includes very useful debugging info.

Reviewers: Igor Soarez <soarez@apple.com>
2024-04-23 02:01:36 +01:00
Johnny Hsu 5193eb9323
KAFKA-16475: add test for TopicImageNodeTest (#15720)
Add a unit test for TopicImageNodeTest.

Co-authored-by: Johnny Hsu <johnnyhsu@fb.com>
Reviewers: Colin P. McCabe <cmccabe@apache.org>
2024-04-16 10:20:34 -07:00
Kuan-Po (Cooper) Tseng 315cd83048
MINOR: remove redundant argument in log (#15699)
remove redundant argument in log

Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-04-12 08:36:33 +08:00
Igor Soarez f6c9feea76
KAFKA-16297: Race condition while promoting future replica (#15557)
If a future replica doesn't get promoted, any directory reassignment sent to the controller should be reversed.

The current logic is already addressing the case when a replica hasn't yet been promoted and the controller hasn't yet acknowledged the directory reassignment. However, it doesn't cover the case where the replica does not get promoted due to a directory failure after the controller has acknowledged the reassignment but before the future replica catches up again and is promoted to main replica.

Reviewers: Luke Chen <showuon@gmail.com>
2024-04-10 17:57:05 +08:00
Calvin Liu 6de58d2731
MINOR; Missing minISR config should log a debug message (#15529)
Log a debug message when the min isr configuration is missing for a topic.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2024-04-06 17:41:32 -07:00
Calvin Liu 376e9e20db
KAFKA-15586: Clean shutdown detection - server side (#14706)
If the broker registers with the same broker epoch as the previous session, it is recognized as a clean shutdown. Otherwise, it is an unclean shutdown. This replica will be removed from any ELR.

Reviewers: Artem Livshits <alivshits@confluent.io>, David Arthur <mumrah@gmail.com>
2024-04-04 09:12:05 -04:00
Alyssa Huang 4ccbf1634a
MINOR: Metadata image test improvements (#15373)
Reviewers: Mickael Maison <mickael.maison@gmail.com>
2024-03-28 11:22:02 +01:00
PoAn Yang 6f8d4fe26b
KAFKA-15949: Unify metadata.version format in log and error message (#15505)
There were different words for metadata.version like metadata version or metadataVersion. Unify format as metadata.version.

Reviewers: Luke Chen <showuon@gmail.com>
2024-03-26 20:09:29 +08:00
Alyssa Huang 03f7b5aa3a
KAFKA-16206: Fix unnecessary topic config deletion during ZK migration (#14206)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ron Dagostino <rndgstn@gmail.com>
2024-03-21 15:38:42 +01:00
Kuan-Po (Cooper) Tseng 12a1d85362
KAFKA-12187 replace assertTrue(obj instanceof X) with assertInstanceOf (#15512)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-03-20 10:36:25 +08:00
Colin Patrick McCabe 2c1943d836
MINOR: remove test constructor for PartitionAssignment (#15435)
Remove the test constructor for PartitionAssignment and remove the TODO.
Also add KRaftClusterTest.testCreatePartitions to get more coverage for
createPartitions.

Reviewers: David Arthur <mumrah@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-03-05 12:02:19 -08:00
Luke Chen 98fb3bd304
MINOR: log error when initialLoadFuture is not done in authorizer (#14953)
Currently, when initializing StandardAuthorizer, it'll wait until all ACL loaded and complete the initialLoadFuture. So, checking logs, we'll see:

2023-12-06 14:07:50,325 INFO [StandardAuthorizer 1] Initialized with 6 acl(s). (org.apache.kafka.metadata.authorizer.StandardAuthorizerData) [kafka-1-metadata-loader-event-handler]
2023-12-06 14:07:50,325 INFO [StandardAuthorizer 1] Completed initial ACL load process. (org.apache.kafka.metadata.authorizer.StandardAuthorizerData) [kafka-1-metadata-loader-event-handler]

But then, when shutting down the node, we will also see this error:

2023-12-06 14:12:32,752 ERROR [StandardAuthorizer 1] Failed to complete initial ACL load process. (org.apache.kafka.metadata.authorizer.StandardAuthorizerData) [kafka-1-metadata-loader-event-handler]
java.util.concurrent.TimeoutException
	at kafka.server.metadata.AclPublisher.close(AclPublisher.scala:98)
	at org.apache.kafka.image.loader.MetadataLoader.closePublisher(MetadataLoader.java:568)
	at org.apache.kafka.image.loader.MetadataLoader.lambda$removeAndClosePublisher$7(MetadataLoader.java:528)
	at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
	at java.base/java.lang.Thread.run(Thread.java:840)

It's confusing. And it's because we'll try to complete authorizer initialLoad, and complete the initialLoadFuture if not done. But we'll log the error no matter it's completed or not. This patch improves the logging.

Reviewers: Josep Prat <josep.prat@aiven.io>
2024-02-17 13:58:11 +08:00
Calvin Liu 756f44a3e5
KAFKA-15665: Enforce partition reassignment should complete when all target replicas are in ISR (#15359)
When completing the partition reassignment, the new ISR should have all the target replicas.

Reviewers: Justine Olshan <jolshan@confluent.io>, David Mao <dmao@confluent.io>
2024-02-16 10:27:43 -08:00
David Arthur c000b1fae2
MINOR: Fix some MetadataDelta handling issues during ZK migration (#15327)
Reviewers: Colin P. McCabe <cmccabe@apache.org>
2024-02-07 12:54:59 -08:00
David Arthur 12ce9c7f98 KAFKA-16216: Reduce batch size for initial metadata load during ZK migration
During migration from ZK mode to KRaft mode, there is a step where the kcontrollers load all of the
data from ZK into the metadata log. Previously, we were using a batch size of 1000 for this, but
200 seems better. This PR also adds an internal configuration to control this batch size, for
testing purposes.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2024-02-01 15:48:52 -08:00
David Arthur 16ed7357b1 KAFKA-16171: Fix ZK migration controller race #15238
This patch causes the active KRaftMigrationDriver to reload the /migration ZK state after electing
itself as the leader in ZK. This closes a race condition where the previous active controller could
make an update to /migration after the new leader was elected. The update race was not actually a
problem regarding the data since both controllers would be syncing the same state from KRaft to ZK,
but the change to the znode causes the new controller to fail on the zk version check on
/migration.

This patch also fixes a as-yet-unseen bug where the active controllers failing to elect itself via
claimControllerLeadership would not retry.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2024-01-29 13:51:45 -08:00
Calvin Liu 7e5ef9b509
KAFKA-15585: Implement DescribeTopicPartitions RPC on broker (#14612)
This patch implements the new DescribeTopicPartitions RPC as defined in KIP-966 (ELR). Additionally, this patch adds a broker config "max.request.partition.size.limit" which limits the number of partitions returned by the new RPC.

Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>, David Arthur <mumrah@gmail.com>
2024-01-24 15:16:09 -05:00
Justine Olshan e00d36b9c0
KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders (#15139)
This originally was #14489 which covered 2 aspects -- reloading on partition epoch changes where leader epoch did not change and reloading when leader epoch changed but we were already the leader.

I've cut out the second part of the change since the first part is much simpler.

Redefining the TopicDelta fields to better distinguish when a leader is elected (leader epoch bump) vs when a leader has isr/replica changes (partition epoch bump). There are some cases where we bump the partition epoch but not the leader epoch. We do not need to do operations that only care about the leader epoch bump. (ie -- onElect callbacks)

Reviewers: Artem Livshits <alivshits@confluent.io>, José Armando García Sancio <jsancio@apache.org>
2024-01-23 14:58:53 -08:00
David Arthur 7bf7fd99a5
KAFKA-16078: Be more consistent about getting the latest MetadataVersion
This PR creates MetadataVersion.latestTesting to represent the highest metadata version (which may be unstable) and MetadataVersion.latestProduction to represent the latest version that should be used in production. It fixes a few cases where the broker was advertising that it supported the testing versions even when unstable metadata versions had not been configured.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
2024-01-17 14:59:22 -08:00
Divij Vaidya 65424ab484
MINOR: New year code cleanup - include final keyword (#15072)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Sagar Rao <sagarmeansocean@gmail.com>
2024-01-11 17:53:35 +01:00
Igor Soarez f385ef468b
KAFKA-15364: Replay BrokerRegistrationChangeRecord.logDirs (#14998)
Any directory changes must be considered when replaying
BrokerRegistrationChangeRecord. This is necessary
to persist directory failures in the cluster metadata,
which #14902 missed.

Reviewers: Omnia G.H Ibrahim <o.g.h.ibrahim@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
2023-12-18 15:43:28 -05:00
David Arthur 7f763d327f
KAFKA-16007 Merge batch records during ZK migration (#15007)
To avoid creating lots of small KRaft batches during the ZK migration, this patch adds a mechanism to merge batches into sizes of at least 1000. This has the effect of reducing the number of batches sent to Raft which reduces the amount of time spent blocking.

Since migrations use metadata transactions, the batch boundaries for migrated records are not significant. Even in light of that, this implementation does not break up existing batches. It will only combine them into a larger batch to meet the minimum size.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2023-12-15 19:33:15 -05:00
Proven Provenzano b0e99b5593
KAFKA-15922: Bump MetadataVersion to support JBOD with KRaft (#14984)
Moves ELR from MetadataVersion IBP_3_7_IV3 into the new IBP_3_8_IV0 because the ELR feature was not completed before 3.7 reached feature freeze.  Leaves IBP_3_7_IV3 empty -- it is a no-op and is not reused for anything.  Adds the new MetadataVersion IBP_3_7_IV4 for the FETCH request changes from KIP-951, which were mistakenly never associated with a MetadataVersion.  Updates the LATEST_PRODUCTION MetadataVersion to IBP_3_7_IV4 to declare both KRaft JBOD and the KIP-951 changes ready for production use.

Reviewers: Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>, Ron Dagostino <rdagostino@confluent.io>, Ismael Juma <ismael@juma.me.uk>, José Armando García Sancio <jsancio@apache.org>, Justine Olshan <jolshan@confluent.io>
2023-12-14 10:08:54 -05:00
Omnia Ibrahim 07490b929b
KAFKA-15365: Broker-side replica management changes (#14881)
Reviewers: Igor Soarez <soarez@apple.com>, Ron Dagostino <rndgstn@gmail.com>, Proven Provenzano <pprovenzano@confluent.io>
2023-12-11 09:34:22 -05:00
Colin Patrick McCabe 32fdb8d173
KAFKA-15956: MetadataShell must take the log directory lock when reading (#14899)
MetadataShell should take an advisory lock on the .lock file of the directory it is reading from.
Add an integration test of this functionality in MetadataShellIntegrationTest.java.

Note: in build.gradle, I had to add some dependencies on server-common's test files in order to use
MockFaultHandler, etc.

MetadataBatchLoader.java: fix a case where a log message was incorrect.  The intention was to print
the number equivalent to (offset + index).  Instead it was printing the offset, followed by the
index. So if the offset was 100 and the index was 1, 1001 would be printed rather than 101.

Co-authored-by: Igor Soarez <i@soarez.me>
Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@apache.org>
2023-12-10 19:18:34 -08:00
Igor Soarez 93b6df6173
KAFKA-15364: Handle log directory failure in the Controller (#14902)
When log directories fail, the broker will send a heartbeat listing the failed directories. This
PR implements processing offline directories in the controller's broker heartbeat handling. We
update broker registrations and generate leadership/ISR changes as necessary.

Co-authored-by: Colin P. McCabe <cmccabe@apache.org>
Reviewers: Ron Dagostino <rndgstn@gmail.com>
2023-12-08 14:44:14 -08:00
Igor Soarez c515bf51f8 KAFKA-15426: Process and persist directory assignments
Handle AssignReplicasToDirs requests, persist metadata changes
with new directory assignments and possible leader elections.

Reviewers: Proven Provenzano <pprovenzano@confluent.io>, Ron Dagostino <rndgstn@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2023-12-07 11:44:45 -08:00
Igor Soarez 32576f61ce
MINOR: always register before touch in BrokerHeartbeatManager (#14934)
BrokerHeartbeatManager should require a call to register(brokerId) before touch(brokerId)

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ron Dagostino <rndgstn@gmail.com>
2023-12-07 10:13:39 -08:00
Colin Patrick McCabe 969bc7749c
KAFKA-15980: Add the CurrentControllerId metric (#14749)
Add the CurrentControllerId metric as described in KIP-1001. This gives us an easy way to identify the current controller by looking at the metrics of any Kafka node (broker or controller).

Reviewers: David Arthur <mumrah@gmail.com>
2023-12-06 21:03:33 -08:00
Igor Soarez f467f6bb4f
KAFKA-15361: Process and persist dir info with broker registration (#14838)
Part of JBOD KIP-858, https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Proven Provenzano <pprovenzano@confluent.io>, Ron Dagostino <rdagostino@confluent.io>
2023-12-06 16:40:43 -05:00
Colin Patrick McCabe ebae7b26b5
MINOR: fix bug where we weren't registering SnapshotEmitterMetrics (#14918)
Fix a bug where we weren't properly exposing SnapshotEmitterMetrics. Add a test.

Reviewers: David Arthur <mumrah@gmail.com>
2023-12-04 21:32:12 -08:00
Igor Soarez 6b87c85291 KAFKA-15886: Always specify directories for new partition registrations
When creating partition registrations directories must always be defined.

If creating a partition from a PartitionRecord or PartitionChangeRecord from an older version that
does not support directory assignments, then DirectoryId.MIGRATING is assumed.

If creating a new partition, or triggering a change in assignment, DirectoryId.UNASSIGNED should be
specified, unless the target broker has a single online directory registered, in which case the
replica should be assigned directly to that single directory.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-11-30 14:10:47 -08:00
Colin Patrick McCabe a94bc8d6d5
KAFKA-15922: Add a MetadataVersion for JBOD (#14860)
Assign MetadataVersion.IBP_3_7_IV2 to JBOD.

Move KIP-966 support to MetadataVersion.IBP_3_7_IV3.

Create MetadataVersion.LATEST_PRODUCTION as the latest metadata version that can be used when formatting a
new cluster, or upgrading a cluster using kafka-features.sh. This will allow us to clearly distinguish between stable
and unstable metadata versions for the first time.

Reviewers: Igor Soarez <soarez@apple.com>, Ron Dagostino <rndgstn@gmail.com>, Calvin Liu <caliu@confluent.io>, Proven Provenzano <pprovenzano@confluent.io>
2023-11-30 10:35:13 -08:00
Jason Gustafson a35e021925
MINOR: Fix flaky `MetadataLoaderTest.testNoPublishEmptyImage` (#14875)
There is a race in the assertion on `capturedImages`. Since the future is signaled first, it is still possible to see an empty list. By adding to the collection first, we can ensure the assertion will succeed.

Reviewers: Reviewers: David Jacot <djacot@confluent.io>
2023-11-30 09:50:19 -08:00
yuyli 937578be65
MINOR: Rename method sendBrokerHeartbeat #14658
Rename sendBrokerHeartbeat to sendBrokerHeartbeatToUnfenceBrokers.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-11-27 13:30:40 -08:00
Colin Patrick McCabe 209f268d6c
KAFKA-15860: ControllerRegistration must be written out to the metadata image (#14807)
The ControllerRegistration records added in KIP-919 should be written out to the metadata
image, not just the log.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2023-11-22 21:25:21 -08:00
Igor Soarez e90692246a
KAFKA-15362: Resolve offline replicas in metadata cache (#14737)
The metadata cache now considers registered log directories
and directory assignments when determining offline replicas.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Proven Provenzano <pprovenzano@confluent.io>
2023-11-21 09:40:04 -08:00
Igor Soarez a03a71d7b5 KAFKA-15357: Aggregate and propagate assignments
A new AssignmentsManager accumulates, batches, and sends KIP-858
assignment events to the Controller. Assignments are sent via
AssignReplicasToDirs requests.

Move QuorumTestHarness.formatDirectories into TestUtils so it can be
used in other test contexts.

Fix a bug in ControllerRegistration.java where the wrong version of the
record was being generated in ControllerRegistration.toRecord.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Proven Provenzano <pprovenzano@confluent.io>, Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>
2023-11-16 16:19:49 -08:00
Mickael Maison 832627fc78
MINOR: Various cleanups in metadata (#14734)
- Remove unused code, suppression
- Simplify/fix test assertions
- Javadoc cleanups

Reviewers: Josep Prat <josep.prat@aiven.io>
2023-11-14 09:25:09 +01:00
Proven Provenzano fa472d26a5 MINOR: Update BrokerRegistration to use a Builder
Update BrokerRegistration to use a Builder. This fixes the proliferation of different constructors,
and makes it clear what arguments are being used where.

Reviewers: Colin P. McCabe <cmccabe@confluent.io>
2023-11-09 10:08:31 -08:00
Colin Patrick McCabe 7060c08d6f
MINOR: Rewrite the meta.properties handling code in Java and fix some issues #14628 (#14628)
meta.properties files are used by Kafka to identify log directories within the filesystem.
Previously, the code for handling them was in BrokerMetadataCheckpoint.scala. This PR rewrites the
code for handling them as Java and moves it to the apache.kafka.metadata.properties namespace. It
also gets rid of the separate types for v0 and v1 meta.properties objects. Having separate types
wasn't so bad back when we had a strict rule that zk clusters used v0 and kraft clusters used v1.
But ZK migration has blurred the lines. Now, a zk cluster may have either v0 or v1, if it is
migrating, and a kraft cluster may have either v0 or v1, at any time.

The new code distinguishes between an individual meta.properties file, which is represented by
MetaProperties, and a collection of meta.properties files, which is represented by
MetaPropertiesEnsemble. It is useful to have this distinction, because in JBOD mode, even if some
log directories are inaccessible, we can still use the ensemble to extract needed information like
the cluster ID. (Of course, even when not in JBOD mode, KRaft servers have always been able to
configure a metadata log directory separate from the main log directory.)

Since we recently added a unique directory.id to each meta.properties file, the previous convention
of passing a "canonical" MetaProperties object for the cluster around to various places in the code
needs to be revisited. After all, we can no longer assume all of the meta.properties files are the
same. This PR fixes these parts of the code. For example, it fixes the constructors of
ControllerApis and RaftManager to just take a cluster ID, rather than a MetaProperties object. It
fixes some other parts of the code, like the constructor of SharedServer, to take a
MetaPropertiesEnsemble object.

Another goal of this PR was to centralize meta.properties validation a bit more and make it
unit-testable. For this purpose, the PR adds MetaPropertiesEnsemble.verify, and a few other
verification methods. These enforce invariants like "the metadata directory must be readable," and
so on.

Reviewers: Igor Soarez <soarez@apple.com>, David Arthur <mumrah@gmail.com>, Divij Vaidya <diviv@amazon.com>, Proven Provenzano <pprovenzano@confluent.io>
2023-11-09 09:32:35 -08:00
Calvin Liu 505e5b3eaa
KAFKA-15584: Leader election with ELR (#14593)
The patch includes the following changes as part of KIP-966

* Allow ISR shrink to empty
* Allow leader election with ELR members
* Allow electing the last known leader

Reviewers: Artem Livshits <alivshits@confluent.io>, David Arthur <mumrah@gmail.com>
2023-11-06 17:21:51 -05:00
Apoorv Mittal a53147e7d9
KAFKA-15673: Adding client metrics resource types (KIP-714) (#14621)
This PR adds resources to store and handle client metrics needed for KIP-714.

Changes include:

Adding CLIENT_METRICS to resource type
Corresponding DYNAMIC client configurations in resources.
Changes to support dynamic loading of configuration on changes.
Changes to support API calls to fetch data stored against the new resource.
Test cases for the changes.

Reviewers: Andrew Schofield <andrew_schofield@uk.ibm.com>, Philip Nee <pnee@confluent.io>, Jun Rao <junrao@gmail.com>
2023-11-03 14:51:50 -07:00
Igor Soarez 0390d5b1a2
KAFKA-15355: Message schema changes (#14290)
Reviewers: Christo Lolov <lolovc@amazon.com>, Colin P. McCabe <cmccabe@apache.org>, Proven Provenzano <pprovenzano@confluent.io>, Ron Dagostino <rdagostino@confluent.io>
2023-11-02 09:46:05 -04:00
Paolo Patierno 2736a2e50a
KAFKA-15689: Logging skipped event when expected migration state is wrong (#14646)
As described in ticket KAFKA-15689, this PR fixes the logging of a migration event when the expected migration state is wrong.

Signed-off-by: Paolo Patierno <ppatierno@live.com>

Reviewers: Luke Chen <showuon@gmail.com>
2023-10-30 17:59:11 +08:00
Paolo Patierno 0c7d1fca92
Using INFO level for migration transition state logging (#14651)
Trivial PR to use the INFO level (instead of DEBUG) for logging the state transition during the ZooKeeper to KRaft migration.
I think it's a useful information to be logged without the need for the user to increase the logging level itself.

Signed-off-by: Paolo Patierno <ppatierno@live.com>

Reviewers: Luke Chen <showuon@gmail.com>, hudeqi <1217150961@qq.com>
2023-10-30 17:57:26 +08:00
David Arthur 37715862d7 KAFKA-15704: Set missing ZkMigrationReady field on ControllerRegistrationRequest
This field was missed by the initial KIP-919 PR(s). The result is that migrations can't begin since
the controllers will never become ready. This patch fixes that as well as pulls over some fixes
from the 3.6 branch.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-10-27 14:16:24 -07:00
David Arthur 339d2556c6
KAFKA-15605: Fix topic deletion handling during ZK migration (#14545)
This patch adds reconciliation logic to migrating ZK brokers to deal with pending topic deletions as well as missed StopReplicas.

During the hybrid mode of the ZK migration, the KRaft controller is asynchronously sending UMR and LISR to the ZK brokers to propagate metadata. Since this process is essentially "best effort" it is possible for a broker to miss a StopReplicas. The new logic lets the ZK broker examine its local logs compared with the full set of replicas in a "Full" LISR. Any local logs which are not present in the set of replicas in the request are removed from ReplicaManager and marked as "stray".

To avoid inadvertent data loss with this new behavior, the brokers do not delete the "stray" partitions. They will rename the directories and log warning messages during log recovery. It will be up to the operator to manually delete the stray partitions. We can possibly enhance this in the future to clean up old stray logs.

This patch makes use of the previously unused Type field on LeaderAndIsrRequest. This was added as part of KIP-516 but never implemented. Since its introduction, an implicit 0 was sent in all LISR. The KRaft controller will now send a value of 2 to indicate a full LISR (as specified by the KIP). The presence of this value acts as a trigger for the ZK broker to perform the log reconciliation.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-10-26 18:13:52 -04:00
Calvin Liu af747fbfed
KAFKA-15581: Introduce ELR (#14312)
This patch introduces preliminary changes for Eligible Leader Replicas (KIP-966)

* New MetadataVersion 16 (3.7-IV1)
* New record versions for PartitionRecord and PartitionChangeRecord
* New tagged fields on PartitionRecord and PartitionChangeRecord
* New static config "eligible.leader.replicas.enable" to gate the whole feature

Reviewers: Artem Livshits <alivshits@confluent.io>, David Arthur <mumrah@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2023-10-19 14:05:15 -04:00
Calvin Liu 14029e2ddd
KAFKA-15582: Identify clean shutdown broker (#14465)
The PR includes:

* Added a new class of CleanShutdownFile which helps write and read from a clean shutdown file.
* Updated the BrokerRegistration API.
* Client side handling for the broker epoch.
* Minimum work on the controller side.

Reviewers: Jun Rao <junrao@gmail.com>
2023-10-19 10:25:23 -07:00
mannoopj da314ee48c
KAFKA-15532: non active controllers return 0 for ZkWriteBeforelag (#14478)
Since only the active controller is performing the dual-write to ZK during a migration, it should be the only controller
to report the ZkWriteBehindLag metric.

Currently, if the controller fails over during a migration, the previous active controller will incorrectly report its last
value for ZkWriteBehindLag forever. Instead, it should report zero.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, David Arthur <mumrah@gmail.com>
2023-10-16 15:22:50 -07:00
Federico Valeri aec07f76d7
KAFKA-15537: Fix metadata downgrade documentation (#14484)
In KIP-778 we introduced the "unsafe" (lossy) downgrade in case metadata has changes in one of the versions between target and current, as defined in MetadataVersion.

The documentation says it is possible:

"Note that the cluster metadata version cannot be downgraded to a pre-production 3.0.x, 3.1.x, or 3.2.x version once it has been upgraded. However, it is possible to downgrade to production versions such as 3.3-IV0, 3.3-IV1, etc."

The command line tool shows that this doesn't work:

bin/kafka-features.sh --bootstrap-server :9092 downgrade --metadata 3.4 --unsafe
Could not downgrade metadata.version to 8. Invalid metadata.version 8. Unsafe metadata downgrade is not supported in this version.
1 out of 1 operation(s) failed.

In addition to unsafe, also safe metadata downgrades are not supported in practice. For example, when you upgrade to 3.5, you land on 3.5-IV2 as metadata version, which has metadata changes and won't let you to downgrade. This is true for every other release at the moment.

This change fixes the documentation to reflect that, and improves the error messages.

Signed-off-by: Federico Valeri <fedevaleri@gmail.com>

Reviewers: Luke Chen <showuon@gmail.com>, Jakub Scholz <github@scholzj.com>
2023-10-12 11:12:44 +08:00
Ritika Reddy bcfc9543d1
MINOR: Move TopicIdPartition class to server-common (#14418)
This patch moves the TopicIdPartition from the metadata module to the server-common module so it can be used by the group-coordinator module as well.

Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, David Jacot <djacot@confluent.io>
2023-09-28 13:55:44 -07:00
Ismael Juma 98febb989a
KAFKA-15485: Fix "this-escape" compiler warnings introduced by JDK 21 (1/N) (#14427)
This is one of the steps required for kafka to compile with Java 21.

For each case, one of the following fixes were applied:
1. Suppress warning if fixing would potentially result in an incompatible change (for public classes)
2. Add final to one or more methods so that the escape is not possible
3. Replace method calls with direct field access.

In addition, we also fix a couple of compiler warnings related to deprecated references in the `core` module.

See the following for more details regarding the new lint warning:
https://www.oracle.com/java/technologies/javase/21-relnote-issues.html#JDK-8015831

Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Chris Egerton <chrise@aiven.io>
2023-09-24 05:59:29 -07:00
Alyssa Huang 2d262efb00
[MINOR] QuorumController tests use testToImage (#14405) 2023-09-22 14:50:20 -04:00
Colin Patrick McCabe 7d45d849f8
KAFKA-15458: Fully resolve endpoint information before registering controllers (#14376)
Endpoint information provided by KafkaConfig can be incomplete in two ways. One is that endpoints
using ephemeral ports will show up as using port 0. Another is that endpoints binding to 0.0.0.0
will show up with a null or blank hostname. Because we were not accounting for this in controller
registration, it was leading to a null pointer dereference when we tried to register a controller
using an endpoint defined as PLAINTEXT://:9092.

This PR adds a ListenerInfo class which can fix both of the causes of incomplete endpoint
information. It also handles serialization to and from various RPC and record formats.
This allows us to remove a lot of boilerplate code and standardize the handling of listeners
between BrokerServer and ControllerServer.

Reviewers: David Arthur <mumrah@gmail.com>
2023-09-20 11:44:00 -07:00
David Arthur b24ccd65b7
KAFKA-15441 Allow broker heartbeats to complete in metadata transaction (#14351)
This patch allows broker heartbeat events to be completed while a metadata transaction is in-flight.

More generally, this patch allows any RUNS_IN_PREMIGRATION event to complete while the controller
is in pre-migration mode even if the migration transaction is in-flight.

We had a problem with broker heartbeats timing out because they could not be completed while a large
ZK migration transaction was in-flight. This resulted in the controller fencing all the ZK brokers which 
has many undesirable downstream effects. 

Reviewers: Akhilesh Chaganti <akhileshchg@users.noreply.github.com>, Colin Patrick McCabe <cmccabe@apache.org>
2023-09-08 16:36:13 -04:00
Colin Patrick McCabe 41b695b6e3
KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers (#14306)
Implement KIP-919: Allow AdminClient to Talk Directly with the KRaft Controller Quorum and add
Controller Registration. This KIP adds a new version of DescribeClusterRequest which is supported
by KRaft controllers. It also teaches AdminClient how to use this new DESCRIBE_CLUSTER request to
talk directly with the controller quorum. This is all gated behind a new MetadataVersion,
IBP_3_7_IV0.

In order to share the DESCRIBE_CLUSTER logic between broker and controller, this PR factors it out
into AuthHelper.computeDescribeClusterResponse.

The KIP adds three new errors codes: MISMATCHED_ENDPOINT_TYPE, UNSUPPORTED_ENDPOINT_TYPE, and
UNKNOWN_CONTROLLER_ID. The endpoint type errors can be returned from DescribeClusterRequest

On the controller side, the controllers now try to register themselves with the current active
controller, by sending a CONTROLLER_REGISTRATION request. This, in turn, is converted into a
RegisterControllerRecord by the active controller. ClusterImage, ClusterDelta, and all other
associated classes have been upgraded to propagate the new metadata. In the metadata shell, the
cluster directory now contains both broker and controller subdirectories.

QuorumFeatures previously had a reference to the ApiVersions structure used by the controller's
NetworkClient. Because this PR removes that reference, QuorumFeatures now contains only immutable
data. Specifically, it contains the current node ID, the locally supported features, and the list
of quorum node IDs in the cluster.

Reviewers: David Arthur <mumrah@gmail.com>, Ziming Deng <dengziming1993@gmail.com>, Luke Chen <showuon@gmail.com>
2023-09-07 15:21:52 -07:00
David Arthur 65e2ecffab
KAFKA-15435 Fix counts in MigrationManifest (#14342)
Reviewers: Liu Zeyu <zeyu.luke@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2023-09-06 13:02:13 -04:00
Dimitar Dimitrov 78c59cd2b0
KAFKA-15052 Fix the flaky testBalancePartitionLeaders - part II (#13908)
A follow-up to https://github.com/apache/kafka/pull/13804.
This follow-up adds the alternative fix approach mentioned in
the PR above - bumping the session timeout used in the test
with 1 second.

Reproducing the flake-out locally has been much harder than
on the CI runs, as neither Gradle with Java 11 or Java 14 nor
IntelliJ with Java 14 could show it, but IntelliJ with Java 11
could occasionally reproduce the failure the first time
immediately after a rebuild. While I was unable to see the
failure with the bumped session timeout, the testing procedure
definitely didn't provide sufficient reassurance for the
fix as even without it often I'd see hundreds of consecutive
successful test runs when the first run didn't fail.

Reviewers: Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>
2023-09-04 17:02:32 +08:00
David Arthur f2d499e25a
KAFKA-15389: Don't publish until we have replayed at least one record (#14282)
When starting up a controller for the first time (i.e., with an empty log), it is possible for
MetadataLoader to publish an empty MetadataImage before the activation records of the controller
have been written. While this is not a bug, it could be confusing. This patch closes that gap by
waiting for at least one controller record to be committed before the MetadataLoader starts publishing
images.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-08-25 10:41:43 -07:00
Phuc-Hong-Tran 8d12c1175c
KAFKA-15152: Fix incorrect format specifiers when formatting string (#14026)
Reviewers: Divij Vaidya <diviv@amazon.com>

Co-authored-by: phuchong.tran <phuchong.tran@servicenow.com>
2023-08-24 19:38:45 +02:00
Ron Dagostino 8394ddc0d2
MINOR: Move delegation token support to Metadata Version 3.6-IV2 (#14270)
#14083 added support for delegation tokens in KRaft and attached that support to the existing
MetadataVersion 3.6-IV1. This patch moves that support into a separate MetadataVersion 3.6-IV2.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-08-22 16:04:53 -07:00
David Arthur 418b8a6e59
KAFKA-14538 Metadata transactions in MetadataLoader (#14208)
This PR contains three main changes:

- Support for transactions in MetadataLoader
- Abort in-progress transaction during controller failover
- Utilize transactions for ZK to KRaft migration

A new MetadataBatchLoader class is added to decouple the loading of record batches from the
publishing of metadata in MetadataLoader. Since a transaction can span across multiple batches (or
multiple transactions could exist within one batch), some buffering of metadata updates was needed
before publishing out to the MetadataPublishers. MetadataBatchLoader accumulates changes into a
MetadataDelta, and uses a callback to publish to the publishers when needed.

One small oddity with this approach is that since we can "splitting" batches in some cases, the
number of bytes returned in the LogDeltaManifest has new semantics. The number of bytes included in
a batch is now only included in the last metadata update that is published as a result of a batch.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-08-21 16:02:14 -07:00
Proven Provenzano c2759df067
KAFKA-15219: KRaft support for DelegationTokens (#14083)
Reviewers: David Arthur <mumrah@gmail.com>, Ron Dagostino <rndgstn@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Viktor Somogyi <viktor.somogyi@cloudera.com>
2023-08-19 14:01:08 -04:00
Colin Patrick McCabe adc16d0f31
KAFKA-14538: Implement KRaft metadata transactions in QuorumController
Implement the QuorumController side of KRaft metadata transactions.

As specified in KIP-868, this PR creates a new metadata version, IBP_3_6_IV1, which contains the
three new records: AbortTransactionRecord, BeginTransactionRecord, EndTransactionRecord.

In order to make offset management unit-testable, this PR moves it out of QuorumController.java and
into OffsetControlManager.java. The general approach here is to track the "last stable offset," which is
calculated by looking at the latest committed offset and the in-progress transaction (if any). When
a transaction is aborted, we revert back to this last stable offset. We also revert back to it when
the controller is transitioning from active to inactive.

In a follow-up PR, we will add support for the transaction records in MetadataLoader. We will also
add support for automatically aborting pending transactions after a controller failover.

Reviewers: David Arthur <mumrah@gmail.com>
2023-08-14 16:58:56 -07:00
Colin Patrick McCabe 9318b591d7
KAFKA-15318: Update the Authorizer via AclPublisher (#14169)
On the controller, move publishing acls to the Authorizer into a dedicated MetadataPublisher,
AclPublisher. This publisher listens for notifications from MetadataLoader, and receives only
committed data. This brings the controller side in line with how the broker has always worked. It
also avoids some ugly code related to publishing directly from the QuorumController. Most important
of all, it clears the way to implement metadata transactions without worrying about Authorizer
state (since it will be handled by the MetadataLoader, along with other metadata image state).

In AclsDelta, we can remove isSnapshotDelta. We always know when the MetadataLoader is giving us a
snapshot. Also bring AclsDelta in line with the other delta classes, where completeSnapshot
calculates the diff between the previous image and the next one. We don't use this delta (since we
just apply the image directly to the authorizer) but we should have it, for consistency.

Finally, change MockAclMutator to avoid the need to subclass AclControlManager.

Reviewers: David Arthur <mumrah@gmail.com>
2023-08-09 23:54:46 -07:00
David Arthur 32c39c8149
KAFKA-15263 Check KRaftMigrationDriver state in each event (#14115)
Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-07-28 13:02:47 -04:00
Colin Patrick McCabe 10bcd4fc7f
KAFKA-15213: provide the exact offset to QuorumController.replay (#13643)
Provide the exact record offset to QuorumController.replay() in all cases. There are several situations
where this is useful, such as logging, implementing metadata transactions, or handling broker
registration records.

In the case where the QC is inactive, and simply replaying records, it is easy to compute the exact
record offset from the batch base offset and the record index.

The active QC case is more difficult. Technically, when we submit records to the Raft layer, it can
choose a batch base offset later than the one we expect, if someone else is also adding records.
While the QC is the only entity submitting data records, control records may be added at any time.
In the current implementation, these are really only used for leadership elections. However, this
could change with the addition of quorum reconfiguration or similar features.

Therefore, this PR allows the QC to tell the Raft layer that a record append should fail if it
would have resulted in a batch base offset other than what was expected. This in turn will trigger a
controller failover. In the future, if automatically added control records become more common, we
may wish to have a more sophisticated system than this simple optimistic concurrency mechanism. But
for now, this will allow us to rely on the offset as correct.

In order that the active QC can learn what offset to start writing at, the PR also adds a new
RaftClient#endOffset function.

At the Raft level, this PR adds a new exception, UnexpectedBaseOffsetException. This gets thrown
when we request a base offset that doesn't match the one the Raft layer would have given us.
Although this exception should cause a failover, it should not be considered a fault. This
complicated the exception handling a bit and motivated splitting more of it out into the new
EventHandlerExceptionInfo class. This will also let us unit test things like slf4j log messages a
bit better.

Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@apache.org>
2023-07-27 17:01:55 -07:00
David Arthur a900794ace
KAFKA-15196 Additional ZK migration metrics (#14028)
This patch adds several metrics defined in KIP-866:

* MigratingZkBrokerCount: the number of zk brokers registered with KRaft
* ZkWriteDeltaTimeMs: time spent writing MetadataDelta to ZK
* ZkWriteSnapshotTimeMs: time spent writing MetadataImage to ZK
* Adds value 4 for "ZK" to ZkMigrationState

Also fixes a typo in the metric name introduced in #14009 (ZKWriteBehindLag -> ZkWriteBehindLag)

Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2023-07-26 12:54:59 -04:00
David Arthur e794bc719a
MINOR: Add a Builder for KRaftMigrationDriver (#14062)
Reviewers: Justine Olshan <jolshan@confluent.io>
2023-07-25 16:05:04 -04:00
Colin Patrick McCabe c7de30f38b
KAFKA-15183: Add more controller, loader, snapshot emitter metrics (#14010)
Implement some of the metrics from KIP-938: Add more metrics for
measuring KRaft performance.

Add these metrics to QuorumControllerMetrics:
    kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount
    kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount
    kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount
    kafka.controller:type=KafkaController,name=NewActiveControllersCount

Create LoaderMetrics with these new metrics:
    kafka.server:type=MetadataLoader,name=CurrentMetadataVersion
    kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount

Create SnapshotEmitterMetrics with these new metrics:
    kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes
    kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs

Reviewers: Ron Dagostino <rndgstn@gmail.com>
2023-07-24 21:13:58 -07:00
Owen Leung 4981fa939d
KAFKA-14712: Produce correct error msg with correct metadataversion (#13773)
Fix the confusing error message in ImageWriterOptions

Reviewers: Luke Chen <showuon@gmail.com>, David Arthur <mumrah@gmail.com>
2023-07-24 10:37:23 +08:00
Hailey Ni 9e50f7cdd3
MINOR: Add ZK dual-write lag metric (#14009)
This patch adds ZKWriteBehindLag metric to the KafkaController mbean as specified in KIP-866

Reviewers: David Arthur <mumrah@gmail.com>
2023-07-16 21:23:01 -04:00
David Arthur d9253fed5c
MINOR Improve logging during the ZK to KRaft migration (#14008)
* Adds an exponential backoff to 1m while the controller is waiting for brokers to show up
* Increases one-time logs to INFO
* Adds a summary of the migration records
* Use RecordRedactor for summary of migration batches (TRACE only)

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-07-14 17:44:00 -04:00
David Arthur c84ac00609
Fix compile test error (#14013)
Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-07-13 23:46:13 -04:00
Colin Patrick McCabe 959f9ca4c0
MINOR: Standardize controller log4j output for replaying records (#13703)
Standardize controller log4j output for replaying important records. The log message should include
word "replayed" to make it clear that this is a record replay. Log the replay of records for ACLs,
client quotas, and producer IDs, which were previously not logged. Also fix a case where we weren't
logging changes to broker registrations.

AclControlManager, ClientQuotaControlManager, and ProducerIdControlManager didn't previously have a
log4j logger object, so this PR adds one. It also converts them to using Builder objects. This
makes junit tests more readable because we don't need to specify paramaters where the test can use
the default (like LogContexts).

Throw an exception in replay if we get another TopicRecord for a topic which already exists.

Example log messages:
  INFO [QuorumController id=3000] Replayed a FeatureLevelRecord setting metadata version to 3.6-IV0
  DEBUG [QuorumController id=3000] Replayed a ZkMigrationStateRecord which did not alter the state from NONE.
  INFO [QuorumController id=3000] Replayed BrokerRegistrationChangeRecord modifying the registration for broker 0: BrokerRegistrationChangeRecord(brokerId=0, brokerEpoch=3, fenced=-1, inControlledShutdown=0)
  INFO [QuorumController id=3000] Replayed ClientQuotaRecord for ClientQuotaEntity(entries={user=testkit}) setting request_percentage to 0.99.

Reviewers: Divij Vaidya <diviv@amazon.com>, Ron Dagostino <rndgstn@gmail.com>, David Arthur <mumrah@gmail.com>
2023-07-13 13:27:15 -07:00
Ron Dagostino edd64fa251
MINOR: more KRaft Metadata Image tests (#13724)
Adds additional testing for the various KRaft *Image classes. For every image that we create we already test that we can get there by applying all the records corresponding to that image written out as a list of records. This patch adds more tests to confirm that we can get to each such final image with intermediate stops at all possible intermediate images along the way.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, David Arthur <mumrah@gmail.com>
2023-07-10 10:01:10 -04:00
David Arthur 726d277c0a
MINOR: Move some things around in KRaftMigrationDriver (#13978)
Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-07-10 09:05:46 -04:00
andymg3 1223b79973
KAFKA-15149: Fix handling of new partitions in dual-write mode (#13968)
Fixes a bug where we don't send UMR and LISR requests in dual-write mode when new partitions are created. Prior to this patch, KRaftMigrationZkWriter was mutating the internal data-structures of TopicDelta which prevented MigrationPropagator from sending UMR and LISR for the changed partitions.

Reviewers: David Arthur <mumrah@gmail.com>
2023-07-07 10:16:51 -04:00
David Arthur fc7d912e8b
KAFKA-15109 Ensure the leader epoch bump occurs for older MetadataVersions (#13910)
This fixes a regression introduced by the previous KAFKA-15109 commit (d0457f7360 on trunk).

Reviewers: Colin P. McCabe <cmccabe@apache.org>, José Armando García Sancio <jsancio@apache.org>
2023-06-27 11:49:20 -04:00
David Arthur 1bf7039999
KAFKA-15098 Allow authorizers to be configured in ZK migration (#13895)
Reviewers: Ron Dagostino <rdagostino@confluent.io>
2023-06-22 09:34:49 -04:00
David Arthur d0457f7360
KAFKA-15109 Don't skip leader epoch bump while in migration mode (#13890)
While in migration mode, the KRaft controller must always bump the leader epoch when shrinking an ISR. 
This is required to maintain compatibility with the ZK brokers. Without the epoch bump, the ZK brokers
will ignore the partition state change present in the LeaderAndIsrRequest since it would not contain a new
leader epoch.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-06-21 13:09:05 -04:00
minjian.cai ba5e1acdfb
MINOR: fix typos for metadata (#13889)
Reviewers: Divij Vaidya <diviv@amazon.com>, Deqi Hu <deqi.hu@shopee.com>
2023-06-21 15:09:15 +02:00
Colin P. McCabe cd3c0ab1a3 KAFKA-15060: fix the ApiVersionManager interface
This PR expands the scope of ApiVersionManager a bit to include returning the current
MetadataVersion and features that are in effect. This is useful in general because that information
needs to be returned in an ApiVersionsResponse. It also allows us to fix the ApiVersionManager
interface so that all subclasses implement all methods of the interface. Having subclasses that
don't implement some methods is dangerous because they could cause exceptions at runtime in
unexpected scenarios.

On the KRaft controller, we were previously performing a read operation in the QuorumController
thread to get the current metadata version and features. With this PR, we now read a volatile
variable maintained by a separate MetadataVersionContextPublisher object. This will improve
performance and simplify the code. It should not change the guarantees we are providing; in both
the old and new scenarios, we need to be robust against version skew scenarios during updates.

Add a Features class which just has a 3-tuple of metadata version, features, and feature epoch.
Remove MetadataCache.FinalizedFeaturesAndEpoch, since it just duplicates the Features class.
(There are some additional feature-related classes that can be consolidated in in a follow-on PR.)

Create a java class, EndpointReadyFutures, for managing the futures associated with individual
authorizer endpoints. This avoids code duplication between ControllerServer and BrokerServer and
makes this code unit-testable.

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>, Luke Chen <showuon@gmail.com>
2023-06-19 16:46:44 -07:00
Luke Chen d3e0b27b24
KAFKA-15040: trigger onLeadershipChange under KRaft mode (#13807)
When received LeaderAndIsr request, we'll notify remoteLogManager about this leadership changed to trigger the following workflow. But LeaderAndIsr won't be sent in KRaft mode, instead, the topicDelta will be received.

This PR fixes this issue by getting leader change and follower change from topicDelta, and triggering rlm.onLeadershipChange to notify remote log manager. Adding tests for remote storage enabled cases.

Reviewers: Satish Duggana <satishd@apache.org>
2023-06-09 09:53:46 +08:00
José Armando García Sancio 8ad0ed3e61
KAFKA-15021; Skip leader epoch bump on ISR shrink (#13765)
When the KRaft controller removes a replica from the ISR because of the controlled shutdown there is no need for the leader epoch to be increased by the KRaft controller. This is accurate as long as the topic partition leader doesn't add the removed replica back to the ISR.

This change also fixes a bug when computing the HWM. When computing the HWM, replicas that are not eligible to join the ISR but are caught up should not be included in the computation. Otherwise, the HWM will never increase for replica.lag.time.max.ms because the shutting down replica is not sending FETCH request. Without this additional fix PRODUCE requests would timeout if the request timeout is greater than replica.lag.time.max.ms.

Because of the bug above the KRaft controller needs to check the MV to guarantee that all brokers support this bug fix before skipping the leader epoch bump.

Reviewers: David Mao <47232755+splett2@users.noreply.github.com>, Divij Vaidya <diviv@amazon.com>, David Jacot <djacot@confluent.io>
2023-06-07 07:20:40 -07:00
andymg3 db9d845702
KAFKA-14791; Create a builder for PartitionRegistration (#13788)
This creates a builder for PartitionRegistration. The motivation for the builder is that the constructor of PartitionRegistration has four arguments all of type int[] which makes it easy to make a mistake when using it.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2023-06-06 07:58:23 -07:00
Dimitar Dimitrov 0d5cf4c385
KAFKA-15052 Fix the flaky QuorumControllerTest.testBalancePartitionLeaders (#13804)
In this test broker session timeout is configured aggressively low
(to 1 second) so that fencing can happen without much waiting. Then
in the final portion of the test when brokers should not be fenced
heartbeats are sent roughly 2 times in a session timeout window.
However the first time that's done there's other code between
sending the heartbeat and taking the timestamp, and in local tests
that code can take up to 0.5 seconds (1/2 of the session timeout).
That then can result in all brokers being fenced again which would
fail the test.

This change sends a heartbeat just when a timestamp is taken,
which in local tests results flaky failures from 4 out of 50
to 0 out of 50.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-06-04 11:02:27 -07:00
Colin Patrick McCabe 146a6976ae
KAFKA-15048: Improve handling of unexpected quorum controller errors (#13799)
When the active quorum controller encounters an "unexpected" error, such as a NullPointerException,
it currently resigns its leadership. This PR fixes it so that in addition to doing that, it also
increments the metadata error count metric. This will allow us to better track down these errors.

This PR also fixes a minor bug where performing read operations on a standby controller would
result in an unexpected RuntimeException. The bug happened because the standby controller does not
take in-memory snapshots, and read operations were attempting to read from the epoch of the latest
committed offset. The fix is for the standby controller to simply read the latest value of each
data structure. This is always safe, because standby controllers don't contain uncommitted data.

Also, fix a bug where listPartitionReassignments was reading the latest data, rather than data from
the last committed offset.

Reviewers: dengziming <dengziming1993@gmail.com>, David Arthur <mumrah@gmail.com>
2023-06-02 12:51:15 -07:00
David Arthur f499662923 KAFKA-15003: Fix ZK sync logic for partition assignments (#13735)
Fixed the metadata change events in the Migration component to check correctly for the diff in
existing topic changes and replicate the metadata to the Zookeeper. Also, made the diff check
exhaustive enough to handle the partial writes in Zookeeper when we're try to replicate changes
using a snapshot in the event of Controller failover.

Add migration client and integration tests to verify the change.

Co-authored-by: Akhilesh Chaganti <akhileshchg@users.noreply.github.com>
2023-06-01 15:43:41 -07:00
David Arthur d27ba5bfba
KAFKA-15010 ZK migration failover support (#13758)
This patch adds snapshot reconciliation during ZK to KRaft migration. This reconciliation happens whenever a snapshot is loaded by KRaft, or during a controller failover. Prior to this patch, it was possible to miss metadata updates coming from KRaft when dual-writing to ZK.

Internally this adds a new state SYNC_KRAFT_TO_ZK to the KRaftMigrationDriver state machine. The controller passes through this state after the initial ZK migration and each time a controller becomes active. 

Logging during dual-write was enhanced to include a count of write operations happening.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-06-01 10:25:46 -04:00
Ron Dagostino e74e5e7ac5
KAFKA-15039: Reduce logging level to trace in PartitionChangeBuilder.… (#13780)
…tryElection()

A CPU profile in a large cluster showed PartitionChangeBuilder.tryElection() taking significant CPU due to logging. We adjust the logging statements in that method for clean elections from DEBUG level to TRACE to mitigate the impact of this logging under normal operations.  Unclean elections are now logged at the INFO level rather than DEBUG.

Reviewers: Jason Gustafson <jason@confluent.io>, Colin P. McCabe <cmccabe@apache.org>
2023-05-31 16:26:01 -04:00
Proven Provenzano 731c8c967e
KAFKA-15017 Fix snapshot load in dual write mode for ClientQuotas and SCRAM (#13757)
This patch fixes the case where a ClientQuota or SCRAM credential was added in KRaft, but not written back to ZK. This missed write only occurred when handling a KRaft snapshot. If the changed quota was processed in a metadata delta (which is the typical case), it would be written to ZK.

Reviewers: David Arthur <mumrah@gmail.com>
2023-05-31 15:42:00 -04:00
Colin Patrick McCabe 9b3db6d50a
KAFKA-15019: Improve handling of broker heartbeat timeouts (#13759)
When the active KRaft controller is overloaded, it will not be able to process broker heartbeat
requests. Instead, they will be timed out. When using the default configuration, this will happen
if the time needed to process a broker heartbeat climbs above a second for a sustained period.
This, in turn, could lead to brokers being improperly fenced when they are still alive.

With this PR, timed out heartbeats will still update the lastContactNs and metadataOffset of the
broker in the BrokerHeartbeatManager. While we don't generate any records, this should still be
adequate to prevent spurious fencing. We also log a message at ERROR level so that this condition
will be more obvious.

Other small changes in this PR: fix grammar issue in log4j of BrokerHeartbeatManager. Add JavaDoc
for ClusterControlManager#zkMigrationEnabled field. Add builder for ReplicationControlTestContext
to avoid having tons of constructors. Update ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS to
match the default in KafkaConfig.

Reviewers: Ismael Juma <ijuma@apache.org>, Ron Dagostino <rdagostino@confluent.io>
2023-05-31 10:49:05 -07:00
David Arthur 7a679af687
KAFKA-15004: Fix configuration dual-write during migration (#13767)
This patch fixes several small bugs with configuration dual-write during migration.

* Topic configs are not written back to ZK while handling snapshot.
* New broker/topic configs in KRaft that did not exist in ZK will not be written to ZK.
* The sensitive configs are not encoded while writing them to Zookeeper.
* Handle topic configs in ConfigMigrationClient and KRaftMigrationZkWriter#handleConfigsSnapshot

Added tests to ensure we no longer have the above mentioned issues.

Co-authored-by: Akhilesh Chaganti <akhileshchg@users.noreply.github.com>
Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-05-27 17:20:44 -04:00
Colin Patrick McCabe b74204fa0a
KAFKA-14996: Handle overly large user operations on the kcontroller (#13742)
Previously, if a user tried to perform an overly large batch operation on the KRaft controller
(such as creating a million topics), we would create a very large number of records in memory. Our
attempt to write these records to the Raft layer would fail, because there were too many to fit in
an atomic batch. This failure, in turn, would trigger a controller failover.

(Note: I am assuming here that no topic creation policy was in place that would prevent the
creation of a million topics. I am also assuming that the user operation must be done atomically,
which is true for all current user operations, since we have not implemented KIP-868 yet.)

With this PR, we fail immediately when the number of records we have generated exceeds the
threshold that we can apply. This failure does not generate a controller failover. We also now
fail with a PolicyViolationException rather than an UnknownServerException.

In order to implement this in a simple way, this PR adds the BoundedList class, which wraps any
list and adds a maximum length. Attempts to grow the list beyond this length cause an exception to
be thrown.

Reviewers: David Arthur <mumrah@gmail.com>, Ismael Juma <ijuma@apache.org>, Divij Vaidya <diviv@amazon.com>
2023-05-26 13:16:17 -07:00
Manyanda Chitimbo a27c98ca61
MINOR: remove unused variable from QuorumMetaLogListener#handleCommit method (#13611)
The local variable processedRecordsSize as just left over from another commit and can be safely removed.

Reviewers: Divij Vaidya <diviv@amazon.com> , José Armando García Sancio <jsancio@apache.org>
2023-05-26 08:21:40 -07:00
Proven Provenzano 79351ec88e
KAFKA-14970: Fix SCRAM during migration dual-write (#13729)
Fixed a bug during dual write mode where if a user is updating SCRAM records and has no quotas, the SCRAM records will not be written to ZK. Add tests explicitly for this scenario.

Reviewers: David Arthur <mumrah@gmail.com>
2023-05-24 17:01:39 -04:00
Colin P. McCabe 12130cfcec MINOR: Create the MetadataNode classes to introspect MetadataImage
Metadata image classes such as MetadataImage, ClusterImage, FeaturesImage, and so forth contain
numerous sub-images. This PR adds a structured way of traversing those sub-images. This is useful
for the metadata shell, and also for implementing toString functions.

In both cases, the previous solution was suboptimal. The metadata shell was previously implemented
in an ad-hoc way by mutating text-based tree nodes when records were replayed. This was difficult
to keep in sync with changes to the record types (for example, we forgot to do this for SCRAM). It
was also pretty low-level, being done at a level below that of the image classes. For toString, it
was difficult to keep the implementations consistent previously, and also support both redacted and
non-redacted output.

The metadata shell directory was getting crowded since we never had submodules for it. This PR
creates glob/, command/, node/, and state/ directories to keep things better organized.

Reviewers: David Arthur <mumrah@gmail.com>, Ron Dagostino <rdagostino@confluent.io>
2023-05-23 10:11:26 -07:00
Akhilesh C ea6ce3bf82
KAFKA-15009: Handle new ACLs in KRaft snapshot during migration (#13741)
When loading a snapshot during dual-write mode, we were missing the logic to detect new ACLs that 
had been added on the KRaft side. This patch adds support for finding those new ACLs as well as tests
to verify the correct behavior.

Reviewers: David Arthur <mumrah@gmail.com>
2023-05-23 10:43:02 -04:00
Akhilesh C 6b95581867
KAFKA-15007: Use the correct MetadataVersion in MetadataPropagator (#13732)
Use the MetadataVersion from the MetadataImage passed to MetadataPropagator. The ensures the propagator 
sends the right versions of UMR, LISR and StopReplica requests when the migration is in DUAL_WRITE mode.

Reviewers: David Arthur <mumrah@gmail.com>
2023-05-22 14:46:53 -04:00
David Mao d944ef1efb MINOR: Rename handleSnapshot to handleLoadSnapshot (#13727)
Rename handleSnapshot to handleLoadSnapshot to make it explicit that it is handling snapshot load,
not generation.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
2023-05-17 09:57:24 -07:00
Divij Vaidya bb10ae4273
KAFKA-14962: Trim whitespace from ACL configuration (#13670)
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Christo Lolov <lolovc@amazon.com>
2023-05-12 23:51:00 +05:30
hudeqi 440bed2391
MINOR:code optimization in QuorumController (#13697)
1. add hint in switch item "BROKER_LOGGER" in ConfigResourceExistenceChecker, otherwise, it will be classified as default break and deleted directly. I don’t know if adding hint is better than deleting directly.
2. delete some unused variables and methods.
3. add the "@test" mark to a method in unit test that is forgotten.

Reviewers: dengziming <dengziming1993@gmail.com>
2023-05-12 14:03:17 +08:00
dengziming a7c9842f70
KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse (#13679)
The KRaft controller return empty finalized features in `ApiVersionResponse`, the brokers are not infected by this, so this problem doesn't have any impact currently, but it's worth fixing it to avoid unexpected problems.

And there is a bunch of of confusing methods in `ApiVersionResponse` which are only used in test code, I moved them to TestUtils to make the code more clear, and force everyone to pass in the correct parameters instead of the default zero parameters, for example, empty supported features and empty finalized features.

Reviewers: Luke Chen <showuon@gmail.com>
2023-05-12 13:46:06 +08:00
David Arthur 0822ce0ed1
KAFKA-14840: Support for snapshots during ZK migration (#13461)
This patch adds support for handling metadata snapshots while in dual-write mode. Prior to this change, if the active
controller loaded a snapshot, it would get out of sync with the ZK state.

In order to reconcile the snapshot state with ZK, several methods were added to scan through the metadata in ZK to
compute differences with the MetadataImage. Since this introduced a lot of code, I opted to split out a lot of methods
from ZkMigrationClient into their own client interfaces, such as TopicMigrationClient, ConfigMigrationClient, and
AclMigrationClient. Each of these has some iterator method that lets the caller examine the ZK state in a single pass
and without using too much memory.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Luke Chen <showuon@gmail.com>
2023-05-05 01:35:26 -07:00
Colin P. McCabe 97c36f3f31 HOTFIX: fix file deletions left out of MINOR: improve QuorumController logging #13540 2023-05-04 12:20:33 -07:00
Colin P. McCabe 63f9f23ec0 MINOR: improve QuorumController logging #13540
When creating the QuorumController, log whether ZK migration is enabled.

When applying a feature level record which sets the metadata version, log the metadata version enum
rather than the numeric feature level.

Improve the logging when we replay snapshots in QuorumController. Log both the beginning and the
end of replay.

When TRACE is enabled, log every record that is replayed in QuorumController. Since some records
may contain sensitive information, create RecordRedactor to assist in logging only what is safe to
put in the log4j file.

Add logging to ControllerPurgatory. Successful completions are logged at DEBUG; failures are logged
at INFO, and additions are logged at TRACE.

Remove SnapshotReason.java, SnapshotReasonTest.java, and
QuorumController#generateSnapshotScheduled. They are deadcode now that snapshot generation moved to
org.apache.kafka.image.publisher.SnapshotGenerator.

Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@apache.org>
2023-05-04 11:18:03 -07:00
Proven Provenzano e29942347a
KAFKA-14859: SCRAM ZK to KRaft migration with dual write (#13628)
Handle migrating SCRAM records in ZK when migrating from ZK to KRaft.

This includes handling writing back SCRAM records to ZK while in dual write mode where metadata updates are written to both the KRaft metadata log and to ZK. This allows for rollback of migration to include SCRAM metadata changes.

Reviewers: David Arthur <mumrah@gmail.com>
2023-05-01 09:56:04 -04:00
Luke Chen d796480fe8
KAFKA-14909: check zkMigrationReady tag before migration (#13631)
1. add ZkMigrationReady in apiVersionsResponse
2. check all nodes if ZkMigrationReady are ready before moving to next migration state

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
2023-04-28 14:35:12 +08:00
Colin Patrick McCabe c708f7ba5f
MINOR: remove spurious call to fatalFaultHandler (#13651)
Remove a spurious call to fatalFaultHandler accidentally introduced by KAFKA-14805.  We should only
invoke the fatal fault handller if we are unable to generate the activation records. If we are
unable to write the activation records, a controller failover should be sufficient to remedy the
situation.

Co-authored-by: Luke Chen showuon@gmail.com

Reviewers: Luke Chen <showuon@gmail.com>, David Arthur <mumrah@gmail.com>
2023-04-28 10:15:26 +08:00
Colin P. McCabe 7049333617 KAFKA-14943: Fix ClientQuotaControlManager validation
Don't allow setting negative or zero values for quotas. Don't allow SCRAM mechanism names to be
used as client quota names. SCRAM mechanisms are not client quotas. (The confusion arose because of
internal ZK representation details that treated them both as "client configs.")

Add unit tests for ClientQuotaControlManager.isValidIpEntity and
ClientQuotaControlManager.configKeysForEntityType.

This change doesn't affect metadata record application, only input validation. If there are bad
client quotas that are set currently, this change will not alter the current behavior (of throwing
an exception and ignoring the bad quota).
2023-04-27 10:42:32 -07:00
David Arthur c1b5c75d92
KAFKA-14805 KRaft controller supports pre-migration mode (#13407)
This patch adds the concept of pre-migration mode to the KRaft controller. While in this mode, 
the controller will only allow certain write operations. The purpose of this is to disallow metadata 
changes when the controller is waiting for the ZK migration records to be committed.

The following ControllerWriteEvent operations are permitted in pre-migration mode

* completeActivation
* maybeFenceReplicas
* writeNoOpRecord
* processBrokerHeartbeat
* registerBroker (only for migrating ZK brokers)
* unregisterBroker

Raft events and other controller events do not follow the same code path as ControllerWriteEvent, 
so they are not affected by this new behavior.

This patch also add a new metric as defined in KIP-868: kafka.controller:type=KafkaController,name=ZkMigrationState

In order to support upgrades from 3.4.0, this patch also redefines the enum value of value 1 to mean 
MIGRATION rather than PRE_MIGRATION.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2023-04-26 10:20:30 -04:00
Manyanda Chitimbo dd63d88ac3
MINOR: fix noticed typo in raft and metadata projects (#13612)
Reviewers: Josep Prat <jlprat@apache.org>
2023-04-21 15:02:06 +02:00
David Jacot 2d0b816150
MINOR: Move `ControllerPurgatory` to `server-common` (#13555)
This patch renames from `ControllerPurgatory` to `DeferredEventQueue` and moves it from the `metadata` module to `server-common` module.

Reviewers: Alexandre Dupriez <alexandre.dupriez@gmail.com>, Ziming Deng <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@apache.org>
2023-04-21 11:19:04 +02:00
Purshotam Chauhan df13775254
KAFKA-14828: Remove R/W locks using persistent data structures (#13437)
Currently, StandardAuthorizer uses a R/W lock for maintaining the consistency of data. For the clusters with very high traffic, we will typically see an increase in latencies whenever a write operation comes. The intent of this PR is to get rid of the R/W lock with the help of immutable or persistent collections. Basically, new object references are used to hold the intermediate state of the write operation. After the completion of the operation, the main reference to the cache is changed to point to the new object. Also, for the read operation, the code is changed such that all accesses to the cache for a single read operation are done to a particular cache object only.

In the PR description, you can find the performance of various libraries at the time of both read and write. Read performance is checked with the existing AuthorizerBenchmark. For write performance, a new AuthorizerUpdateBenchmark has been added which evaluates the performance of the addAcl operation.


Reviewers:  Ron Dagostino <rndgstn@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>,  Divij Vaidya <diviv@amazon.com>
2023-04-21 14:08:23 +05:30
Proven Provenzano abca86511e
KAFKA-14881: Rework UserScramCredentialRecord (#13513)
Rework UserScramCredentialRecord to store serverKey and StoredKey rather than saltedPassword. This
is necessary to support migration from ZK, since those are the fields we stored in ZK.  Update
latest MetadataVersion to IBP_3_5_IV2 and make SCRAM support conditional on this version.  Moved
ScramCredentialData.java from org.apache.kafka.image to org.apache.kafka.metadata, which seems more
appropriate.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-04-18 09:41:38 -07:00
Manyanda Chitimbo b36a170aa3
MINOR: fix typos in MigrationClient, StandardAuthorizer, StandardAuthorizerData and KafkaConfigSchema files (#13593)
Reviewers: Luke Chen <showuon@gmail.com>
2023-04-18 19:36:56 +08:00
Ron Dagostino e27926f92b
KAFKA-14735: Improve KRaft metadata image change performance at high … (#13280)
topic counts.

Introduces the use of persistent data structures in the KRaft metadata image to avoid copying the entire TopicsImage upon every change.  Performance that was O(<number of topics in the cluster>) is now O(<number of topics changing>), which has dramatic time and GC improvements for the most common topic-related metadata events.  We abstract away the chosen underlying persistent collection library via ImmutableMap<> and ImmutableSet<> interfaces and static factory methods.

Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>, Purshotam Chauhan <pchauhan@confluent.io>
2023-04-17 17:52:28 -04:00
andymg3 c4ad09e47d
MINOR: Add more KRaft reassignment tests (#13521)
Although KAFKA-14808 did not affect KRaft mode, it is important to ensure that we have regression
tests in KRaft mode to prevent a similar bug from appearing there in the future. This PR adds two
tests. First, it adds a test that makes sure we handle what happens when a reassignment completes
and none of the new replicas can be made leader. It's important that we dont keep an old replica as
leader. Second, it adds a test that makes sure we handle new reassignments that don't include a
previous assignment replica that was leader.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-04-12 12:00:35 -07:00
Colin Patrick McCabe f1f35ef1a8
KAFKA-14894: MetadataLoader must call finishSnapshot after loading a snapshot (#13541)
The MetadataLoader must call finishSnapshot after loading a snapshot. This function removes
whatever was in the old snapshot that is not in the new snapshot that was just loaded. While this
is not significant when the old snapshot was the empty snapshot, it is important to do when we are
loading a snapshot on top of an existing non-empty image.

In initializeNewPublishers, the newly installed publishers should be given a MetadataDelta based on
MetadataImage.EMPTY, reflecting the fact that they are seeing everything for the first time.

Reviewers: David Arthur <mumrah@gmail.com>
2023-04-11 15:02:33 -07:00
José Armando García Sancio 672dd3ab6a
KAFKA-13020; Implement reading Snapshot log append timestamp (#13345)
The SnapshotReader exposes the "last contained log time". This is mainly used during snapshot cleanup. The previous implementation used the append time of the snapshot record. This is not accurate as this is the time when the snapshot was created and not the log append time of the last record included in the snapshot.

The log append time of the last record included in the snapshot is store in the header control record of the snapshot. The header control record is the first record of the snapshot.

To be able to read this record, this change extends the RecordsIterator to decode and expose the control records in the Records type.

Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
2023-04-07 09:25:54 -07:00
Calvin Liu 8c88cdb718
KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request. (#13408)
Second part of the [KIP-903](https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR), it updates the AlterPartitionRequest:
- Deprecate the NewIsr field
- Create a new field BrokerState with BrokerId and BrokerEpoch
- Bump the AlterPartition version to 3

With this change, the Quorum Controller is enabled to reject stale AlterPartition request.

Reviewers: Jun Rao <junrao@gmail.com>, David Jacot <djacot@confluent.io>
2023-03-31 11:27:42 +02:00
andymg3 887d05559f
MINOR: Create only one FeatureControlManager instance in ReplicationControlManagerTest (#13468)
This is a small patch to make it so we only create one FeatureControlManager instance in ReplicationControlManagerTest. Currently we create two, which isn't needed. Its also a bit confusing because the ReplicationControlTestContext objects ends up having a different FeatureControlManager reference that the one its own ReplicationControlManager instance has a reference to.

Reviewers: José Armando García Sancio <jsancio@apache.org>, dengziming <dengziming1993@gmail.com>
2023-03-29 19:10:03 -07:00
Colin Patrick McCabe 09e59bc776
KAFKA-14857: Fix some MetadataLoader bugs (#13462)
The MetadataLoader is not supposed to publish metadata updates until we have loaded up to the high
water mark. Previously, this logic was broken, and we published updates immediately. This PR fixes
that and adds a junit test.

Another issue is that the MetadataLoader previously assumed that we would periodically get
callbacks from the Raft layer even if nothing had happened. We relied on this to install new
publishers in a timely fashion, for example. However, in older MetadataVersions that don't include
NoOpRecord, this is not a safe assumption.

Aside from the above changes, also fix a deadlock in SnapshotGeneratorTest, fix the log prefix for
BrokerLifecycleManager, and remove metadata publishers on brokerserver shutdown (like we do for
controllers).

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
2023-03-29 12:30:12 -07:00
andymg3 379b6978a0
KAFKA-14829: Consolidate reassignment logic into PartitionReassignmentReplicas (#13440)
Currently, we have various bits of reassignment logic spread across different classes. For example, ReplicationControlManager contains logic for when a reassignment is in progress, which is duplication in PartitionChangeBuilder. Another example is PartitionReassignmentRevert which contains logic for how to undo/revert a reassignment. The idea here is to move the logic to PartitionReassignmentReplicas so it's more testable and easier to reason about.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2023-03-29 10:12:40 -07:00
David Arthur f1b3732fa6
KAFKA-14796 Migrate ACLs from AclAuthorizor to KRaft (#13368)
This patch refactors the loadCache method in AclAuthorizer to make it reusable by ZkMigrationClient.
The loaded ACLs are converted to AccessControlEntryRecord. I noticed we still have the defunct
AccessControlRecord, so I've deleted it.

Also included here are the methods to write ACL changes back to ZK while in dual-write mode.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Colin P. McCabe <cmccabe@apache.org>
2023-03-27 16:12:02 -07:00
Colin Patrick McCabe ed400e4c0d
KAFKA-14835: Create ControllerMetadataMetricsPublisher (#13438)
Separate out KRaft controller metrics into two groups: metrics directly managed by the
QuorumController, and metrics handled by an external publisher. This separation of concerns makes
the code easier to reason about, by clarifying what metrics can be changed where.

The external publisher, ControllerServerMetricsPublisher, handles all metrics which are related to
the content of metadata. For example, metrics about number of topics or number of partitions, etc.
etc. It fits into the MetadataLoader metadata publishing framework as another publisher.  Since
ControllerServerMetricsPublisher operates off of a MetadataImage, we don't have to create
(essentially) another copy of the metadata in memory, as ControllerMetricsManager. This reduces
memory consumption. Another benefit of operating off of the MetadataImage is that we don't have to
have special handling for each record type, like we do now in ControllerMetricsManager.

Reviewers: David Arthur <mumrah@gmail.com>
2023-03-24 11:26:53 -07:00
andymg3 df5850274d
MINOR: Expand use of PartitionAssignment (#13402)
Updates ReplicationControlManager and PartitionReassignmentReplicas to use PartitionAssignment.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2023-03-20 13:44:54 -07:00
Colin Patrick McCabe ddd652c672
MINOR: Standardize KRaft logging, thread names, and terminology (#13390)
Standardize KRaft thread names.

- Always use kebab case. That is, "my-thread-name".

- Thread prefixes are just strings, not Option[String] or Optional<String>.
  If you don't want a prefix, use the empty string.

- Thread prefixes end in a dash (except the empty prefix). Then you can
  calculate thread names as $prefix + "my-thread-name"

- Broker-only components get "broker-$id-" as a thread name prefix. For example, "broker-1-"

- Controller-only components get "controller-$id-" as a thread name prefix. For example, "controller-1-"

- Shared components get "kafka-$id-" as a thread name prefix. For example, "kafka-0-"

- Always pass a prefix to KafkaEventQueue, so that threads have names like
  "broker-0-metadata-loader-event-handler" rather than "event-handler". Prior to this PR, we had
  several threads just named "EventHandler" which was not helpful for debugging.

- QuorumController thread name is "quorum-controller-123-event-handler"

- Don't set a thread prefix for replication threads started by ReplicaManager. They run only on the
  broker, and already include the broker ID.

Standardize KRaft slf4j log prefixes.

- Names should be of the form "[ComponentName id=$id] ". So for a ControllerServer with ID 123, we
  will have "[ControllerServer id=123] "

- For the QuorumController class, use the prefix "[QuorumController id=$id] " rather than
  "[Controller <nodeId] ", to make it clearer that this is a KRaft controller.

- In BrokerLifecycleManager, add isZkBroker=true to the log prefix for the migration case.

Standardize KRaft terminology.

- All synonyms of combined mode (colocated, coresident, etc.) should be replaced by "combined"

- All synonyms of isolated mode (remote, non-colocated, distributed, etc.) should be replaced by
  "isolated".
2023-03-16 15:33:03 -07:00
David Arthur 5dcdf71dec
MINOR: Improved error handling in ZK migration (#13372)
This patch fixes many small issues to improve error handling and logging during the ZK migration. A test was added
to simulate a ZK session expiration to ensure the correctness of the migration driver.

With this change, ZK errors thrown during the migration will not hit the fault handler registered with with
KRaftMigrationDriver, but they will be logged.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-03-16 14:21:18 -07:00
Calvin Liu 79b5f7f1ce
KAFKA-14617: Add ReplicaState to FetchRequest (KIP-903) (#13323)
This patch is the first part of KIP-903. It updates the FetchRequest to include the new tagged ReplicaState field which replaces the now deprecated ReplicaId field. The FetchRequest version is bumped to version 15 and the MetadataVersion to 3.5-IV1.

Reviewers: David Jacot <djacot@confluent.io>
2023-03-16 14:04:34 +01:00
Colin Patrick McCabe aaa976a340
MINOR: Some metadata publishing fixes and refactors (#13337)
This PR refactors MetadataPublisher's interface a bit. There is now an onControllerChange
callback. This is something that some publishers might want. A good example is ZkMigrationClient.
Instead of two different publish functions (one for snapshots, one for log deltas), we now have a single onMetadataUpdate function. Most publishers didn't want to do anything different in those two cases.
The ones that do want to do something different for snapshots can always check the manifest type.
The close function now has a default empty implementation, since most publishers didn't need to do
anything there.

Move the SCRAM logic out of BrokerMetadataPublisher and run it on the controller as well.

On the broker, simply use dynamicClientQuotaPublisher to handle dynamic client quotas changes.
That is what the controller already does, and the code is exactly the same in both cases.

Fix the logging in FutureUtils.waitWithLogging a bit. Previously, when invoked from BrokerServer
or ControllerServer, it did not include the standard "[Controller 123] " style prefix indicating server
name and ID. This was confusing, especially when debugging junit tests.

Reviewers: Ron Dagostino <rdagostino@confluent.io>, David Arthur <mumrah@gmail.com>
2023-03-09 14:52:40 -08:00
andymg3 1394675900
MINOR: Add unclean field of PartitionReassignmentRevert to hashCode Equals and toString (#13370)
Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-03-09 10:26:41 -08:00
Ron Dagostino e3817cac89
KAFKA-14351: Controller Mutation Quota for KRaft (#13116)
Implement KIP-599 controller mutation quotas for the KRaft controller. These quotas apply to create
topics, create partitions, and delete topic operations. They are specified in terms of number of
partitions.

The approach taken here is to reuse the ControllerMutationQuotaManager that is also used in ZK
mode. The quotas are implemented as Sensor objects and Sensor.checkQuotas enforces the quota,
whereas Sensor.record notes that new partitions have been modified. While ControllerApis handles
fetching the Sensor objects, we must make the final callback to check the quotas from within
QuorumController. The reason is because only QuorumController knows the final number of partitions
that must be modified. (As one example, up-to-date information about the number of partitions that
will be deleted when a topic is deleted is really only available in QuorumController.)

For quota enforcement, the logic is already in place. The KRaft controller is expected to set the
throttle time in the response that is embedded in EnvelopeResponse, but it does not actually apply
the throttle because there is no client connection to throttle. Instead, the broker that forwarded
the request is expected to return the throttle value from the controller and to throttle the client
connection. It also applies its own request quota, so the enforced/returned quota is the maximum of
the two.

This PR also installs a DynamicConfigPublisher in ControllerServer. This allows dynamic
configurations to be published on the controller. Previously, they could be set, but they were not
applied. Note that we still don't have a good way to set node-level configurations for isolatied
controllers. However, this will allow us to set cluster configs (aka default node configs) and have
them take effect on the controllers.

In a similar vein, this PR separates out the dynamic client quota publisher logic used on the
broker into DynamicClientQuotaPublisher. We can now install this on both BrokerServer and
ControllerServer. This makes dynamically configuring quotas (such as controller mutation quotas)
possible.

Also add a ducktape test, controller_mutation_quota_test.py.

Reviewers: David Jacot <djacot@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Colin P. McCabe <cmccabe@apache.org>
2023-03-07 11:25:34 -08:00
Christo Lolov 5b295293c0
MINOR: Remove unnecessary toString(); fix comment references (#13212)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>, Lucas Brutschy <lbrutschy@confluent.io>
2023-03-06 18:39:04 +01:00
Proven Provenzano 38c409cf33
KAFKA-14084: SCRAM support in KRaft. (#13114)
This commit adds support to store the SCRAM credentials in a cluster with KRaft quorum servers and
no ZK cluster backing the metadata. This includes creating ScramControlManager in the controller,
and adding support for SCRAM to MetadataImage and MetadataDelta.

Change UserScramCredentialRecord to contain only a single tuple (name, mechanism, salt, pw, iter)
rather than a mapping between name and a list. This will avoid creating an excessively large record
if a single user has many entries. Because record ID 11 (UserScramCredentialRecord) has not been
used before, this is a compatible change. SCRAM will be supported in 3.5-IV0 and later.

This commit does not include KIP-900 SCRAM bootstrapping support, or updating the credential cache
on the controller (as opposed to broker). We will implement these in follow-on commits.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2023-03-03 10:23:34 -08:00
Colin P. McCabe 6b89672b5e MINOR: some ZK migration code cleanups.
Some minor improvements to the JavaDoc for ZkMigrationState.

Rename MigrationState to MigrationDriverState to avoid confusion with ZkMigrationState.

Remove ClusterImage#zkBrokers. This costs O(num_brokers) time to calculate, but is only ever used
when in migration state. It should just be calculated in the migration code. (Additionally, the
function ClusterImage.zkBrokers() returns something other than ClusterImage#zkBrokers, which is
confusing.)

Also remove ClusterDelta#liveZkBrokerIdChanges. This is only used in one place, and it's easy to
calculate it there. In general we should avoid providing expensive accessors unless absolutely
necessary. Expensive code should look expensive: if people want to iterate over all brokers, they
can write a loop to do that rather than hiding it inside an accessor.
2023-02-28 13:59:07 -08:00
Purshotam Chauhan c39123d83d
KAKFA-14733: Added a few missing checks for Kraft Authorizer and updated AclAuthorizerTest to run tests for both zk and kraft (#13282)
Added the following checks - 
* In StandardAuthorizerData.authorize() to fail if `patternType` other than `LITERAL` is passed.
* In AclControlManager.addAcl() to fail if Resource Name is null or empty.

Also, updated `AclAuthorizerTest` includes a lot of tests covering various scenarios that are missing in `StandardAuthorizerTest`. This PR changes the AclAuthorizerTest to run tests for both `zk` and `kraft` modes - 
* Rename AclAuthorizerTest -> AuthorizerTest
* Parameterize relevant tests to run for both modes

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
2023-02-21 19:21:15 +05:30
Christo Lolov ba0c5b0902
MINOR: Simplify JUnit assertions in tests; remove accidental unnecessary code in tests (#13219)
* assertEquals called on array
* Method is identical to its super method
* Simplifiable assertions
* Unused imports

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>
2023-02-16 16:13:31 +01:00
David Arthur cb4d9d1abf
KAFKA-14668 Avoid unnecessary UMR during ZK migration (#13183)
Only send UMR to ZK brokers if the cluster metadata or topic metadata has changed.

Reviewers: Akhilesh C <akhileshchg@users.noreply.github.com>, Colin P. McCabe <cmccabe@apache.org>
2023-02-09 13:24:02 -05:00
Christo Lolov a0a9b6ffea
MINOR: Remove unnecessary code (#13210)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>
2023-02-07 17:37:45 +01:00
Ron Dagostino 6d11261d5d
MINOR: IBP_3_4_IV1 should be IBP_3_5_IV0 because it is not in 3.4 (#13198)
The KIP-405 MetadataVersion changes will be released as part of AK 3.5, but were added as BP_3_4_IV1.
This change fixes them to be IBP_3_5_IV0. There is no incompatibility  because this feature has not yet
been released. Also set didMetadataChange to false because KRaft metadata log records did not change.

Reviewers: Satish Duggana <satishd@apache.org>, Christo Lolov <christo_lolov@yahoo.com>, Colin P. McCabe <cmccabe@apache.org>
2023-02-06 10:37:50 -08:00
David Arthur 89a4735c35
KAFKA-14656: Send UMR first during ZK migration (#13159)
When in migration-from-ZK mode and sending RPCs to ZK-based brokers, the KRaft controller must send
full UpdateMetadataRequests prior to sending full LeaderAndIsrRequests. If the controller sends the
requests in the other order, and the ZK-based broker does not already know about some of the nodes
referenced in the LeaderAndIsrRequest, it will reject the request.

This PR includes an integration test, and a number of other small fixes for dual-write.

Co-authored-by: Akhilesh C <akhileshchg@users.noreply.github.com>
Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-01-30 22:31:45 -08:00
José Armando García Sancio 058d8d530b
KAFKA-14618; Fix off by one error in snapshot id (#13108)
The KRaft client expects the offset of the snapshot id to be an end offset. End offsets are
exclusive. The MetadataProvenance type was createing a snapshot id using the last contained offset
which is inclusive. This change fixes that and renames some of the fields to make this difference
more obvious.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-01-13 10:06:38 -08:00
andymg3 0d9a7022a4
KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created (#13104)
### JIRA
https://issues.apache.org/jira/browse/KAFKA-14612

### Details
Makes sure we emit `ConfigRecord`s for a topic iff it actually gets created. Currently, we might emit `ConfigRecord`s even if the topic creation fails later in the `createTopics` method.

I created a new method `incrementalAlterConfig` in `ConfigurationControlManager` that is similar to `incrementalAlterConfig` but it just handles one config at a time. This is used in `ReplicationControlManager` for each topic. By handling one topic's config at a time, it's easier to isolate each topic's config records. This enables us to make sure we only write config records for topics that get created.

I refactored `incrementalAlterConfigResource` to return an `ApiError`. This made it easier to implement the new method `incrementalAlterConfig` in `ConfigurationControlManager` because it then doesnt have to search in the `Map` for the result.

### Testing
Enhanced pre-existing test `ReplicationControlManagerTest.testCreateTopicsWithConfigs`. I ran the tests without the changes to `ReplicationControlManager` and made sure each assertion ends up failing. Also ran `./gradlew metadata:test --tests org.apache.kafka.controller.ReplicationControlManagerTest`.

Reviewers: Jason Gustafson <jason@confluent.io>
2023-01-12 16:23:57 -08:00
Colin Patrick McCabe 8478bbb589
KAFKA-14601: Improve exception handling in KafkaEventQueue #13089
If KafkaEventQueue gets an InterruptedException while waiting for a condition variable, it
currently exits immediately. Instead, it should complete the remaining events exceptionally and
then execute the cleanup event. This will allow us to finish any necessary cleanup steps.

In order to do this, we require the cleanup event to be provided when the queue is contructed,
rather than when it's being shut down.

Also, handle cases where Event#handleException itself throws an exception.

Remove timed shutdown from the event queue code since nobody was using it, and it adds complexity.

Add server-common/src/test/resources/test/log4j.properties since this gradle module somehow avoided
having a test log4j.properties up to this point.

Reviewers: David Arthur <mumrah@gmail.com>
2023-01-12 10:03:14 -08:00
David Arthur 0bb05d8679
KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (#13103)
With the new broker epoch validation logic introduced in #12998, we no longer need the ZK broker epoch to be sent to the KRaft controller. This patch removes that epoch and replaces it with a boolean.

Another small fix is included in this patch for controlled shutdown in migration mode. Previously, if a ZK broker was in migration mode, it would always try to do controlled shutdown via BrokerLifecycleManager. Since there is no ordering dependency between bringing up ZK brokers and the KRaft quorum during migration, a ZK broker could be running in migration mode, but talking to a ZK controller. A small check was added to see if the current controller is ZK or KRaft before decided which controlled shutdown to attempt.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-01-11 14:36:56 -05:00
andymg3 43f531d87a
MINOR: Implement toString method for TopicAssignment and PartitionAssignment (#13101)
Implements `toString` method for classes `TopicAssignment` and` PartitionAssignment`. Also removes the `final` keyword from the constructor arguments for consistency.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2023-01-10 10:00:59 -08:00
Akhilesh C db49070760
KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller. (#12998)
This patch introduces a preliminary state machine that can be used by KRaft
controller to drive online migration from Zk to KRaft.

MigrationState -- Defines the states we can have while migration from Zk to
KRaft.

KRaftMigrationDriver -- Defines the state transitions, and events to handle
actions like controller change, metadata change, broker change and have
interfaces through which it claims Zk controllership, performs zk writes and
sends RPCs to ZkBrokers.

MigrationClient -- Interface that defines the functions used to claim and
relinquish Zk controllership, read to and write from Zk.

Co-authored-by: David Arthur <mumrah@gmail.com>
Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-01-09 10:44:11 -08:00
Luke Chen 6b5e9e989b
MINOR: add error reason when controller failed to handle events (#13050)
In KRaft, when controller failed to handle events, we'll log error and return back to brokers. But in some cases, we only log error class name, and return error class name back to brokers, which is un-useful for troubleshooting. Ex: When broker registration failed with unsupported version error, it showed:

2022-12-28T17:46:42.876+0800 [DEBUG] [TestEventLogger]     [2022-12-28 17:46:42,877] INFO [Controller 3000] registerBroker: failed with UnsupportedVersionException in 2888 us (org.apache.kafka.controller.QuorumController:447)

2022-12-28T17:46:42.877+0800 [DEBUG] [TestEventLogger]     [2022-12-28 17:46:42,878] INFO [BrokerLifecycleManager id=0] Unable to register broker 0 because the controller returned error UNSUPPORTED_VERSION (kafka.server.BrokerLifecycleManager:66)

Checking the logs, we still don't know which version it supports.
After this PR, it will show:

2022-12-28T17:54:59.671+0800 [DEBUG] [TestEventLogger]     [2022-12-28 17:54:59,671] INFO [Controller 3000] registerBroker: failed with UnsupportedVersionException in 291 us. Reason: Unable to register because the broker does not support version 8 of metadata.version. It wants a version between 4 and 4, inclusive. (org.apache.kafka.controller.QuorumController:447)

2022-12-28T17:54:59.671+0800 [DEBUG] [TestEventLogger]     [2022-12-28 17:54:59,672] INFO [BrokerLifecycleManager id=0] Unable to register broker 0 because the controller returned error UNSUPPORTED_VERSION (kafka.server.BrokerLifecycleManager:66)

Reviewers: dengziming <dengziming1993@gmail.com>, Federico Valeri <fvaleri@redhat.com >
2023-01-07 09:27:59 +08:00
Akhilesh Chaganti 0e51a2026c KAFKA-14458: Introduce RPC support during ZK migration #13028
Add infrastructure for sending UpdateMetadataRequest and LeaderAndIsr RPCs during the migration
process from ZK to KRaft. The new classes use ControllerChannelManager to send the RPCs.  The
information to send comes from MetadataDelta and MetadataImage.

Reviewers: David Arthur <mumrah@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2023-01-04 16:54:58 -08:00
Ismael Juma 96d9710c17
KAFKA-14478: Move LogConfig/CleanerConfig and related to storage module (#13049)
Additional notable changes to fix multiple dependency ordering issues:

* Moved `ConfigSynonym` to `server-common`
* Moved synonyms from `LogConfig` to `ServerTopicConfigSynonyms `
* Removed `LogConfigDef` `define` overrides and rely on
   `ServerTopicConfigSynonyms` instead.
* Moved `LogConfig.extractLogConfigMap` to `KafkaConfig`
* Consolidated relevant defaults from `KafkaConfig`/`LogConfig` in the latter
* Consolidate relevant config name definitions in `TopicConfig`
* Move `ThrottledReplicaListValidator` to `storage`

Reviewers: Satish Duggana <satishd@apache.org>, Mickael Maison <mickael.maison@gmail.com>
2023-01-04 02:42:52 -08:00
José Armando García Sancio 44b3177a08
KAFKA-14457; Controller metrics should only expose committed data (#12994)
The controller metrics in the controllers has three problems. 1) the active controller exposes uncommitted data in the metrics. 2) the active controller doesn't update the metrics when the uncommitted data gets aborted. 3) the controller doesn't update the metrics when the entire state gets reset.

We fix these issues by only updating the metrics when processing committed metadata records and reset the metrics when the metadata state is reset.

This change adds a new type `ControllerMetricsManager` which processes committed metadata records and updates the metrics accordingly. This change also removes metrics updating responsibilities from the rest of the controller managers. 

Reviewers: Ron Dagostino <rdagostino@confluent.io>
2022-12-20 10:55:14 -08:00
Satish Duggana 7146ac57ba
[KAFKA-13369] Follower fetch protocol changes for tiered storage. (#11390)
This PR implements the follower fetch protocol as mentioned in KIP-405.

Added a new version for ListOffsets protocol to receive local log start offset on the leader replica. This is used by follower replicas to find the local log star offset on the leader.

Added a new version for FetchRequest protocol to receive OffsetMovedToTieredStorageException error. This is part of the enhanced fetch protocol as described in KIP-405.

We introduced a new field locaLogStartOffset to maintain the log start offset in the local logs. Existing logStartOffset will continue to be the log start offset of the effective log that includes the segments in remote storage.

When a follower receives OffsetMovedToTieredStorage, then it tries to build the required state from the leader and remote storage so that it can be ready to move to fetch state.

Introduced RemoteLogManager which is responsible for

initializing RemoteStorageManager and RemoteLogMetadataManager instances.
receives any leader and follower replica events and partition stop events and act on them
also provides APIs to fetch indexes, metadata about remote log segments.
Followup PRs will add more functionality like copying segments to tiered storage, retention checks to clean local and remote log segments. This will change the local log start offset and make sure the follower fetch protocol works fine for several cases.

You can look at the detailed protocol changes in KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-FollowerReplication

Co-authors: satishd@apache.org, kamal.chandraprakash@gmail.com, yingz@uber.com

Reviewers: Kowshik Prakasam <kprakasam@confluent.io>, Cong Ding <cong@ccding.com>, Tirtha Chatterjee <tirtha.p.chatterjee@gmail.com>, Yaodong Yang <yangyaodong88@gmail.com>, Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Jun Rao <junrao@gmail.com>
2022-12-17 09:36:44 -08:00
Colin Patrick McCabe 29c09e2ca1
MINOR: ControllerServer should use the new metadata loader and snapshot generator (#12983)
This PR introduces the new metadata loader and snapshot generator. For the time being, they are
only used by the controller, but a PR for the broker will come soon.

The new metadata loader supports adding and removing publishers dynamically. (In contrast, the old
loader only supported adding a single publisher.) It also passes along more information about each
new image that is published. This information can be found in the LogDeltaManifest and
SnapshotManifest classes.

The new snapshot generator replaces the previous logic for generating snapshots in
QuorumController.java and associated classes. The new generator is intended to be shared between
the broker and the controller, so it is decoupled from both.

There are a few small changes to the old snapshot generator in this PR. Specifically, we move the
batch processing time and batch size metrics out of BrokerMetadataListener.scala and into
BrokerServerMetrics.scala.

Finally, fix a case where we are using 'is' rather than '==' for a numeric comparison in
snapshot_test.py.

Reviewers: David Arthur <mumrah@gmail.com>
2022-12-15 16:53:07 -08:00
David Arthur 67c72596af
KAFKA-14448 Let ZK brokers register with KRaft controller (#12965)
Prior to starting a KIP-866 migration, the ZK brokers must register themselves with the active
KRaft controller. The controller waits for all brokers to register in order to verify that all the
brokers can

A) Communicate with the quorum
B) Have the migration config enabled
C) Have the proper IBP set

This patch uses the new isMigratingZkBroker field in BrokerRegistrationRequest and
RegisterBrokerRecord. The type was changed from int8 to bool for BrokerRegistrationRequest (a
mistake from #12860). The ZK brokers use the existing BrokerLifecycleManager class to register and
heartbeat with the controllers.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2022-12-13 13:15:21 -08:00
Jason Gustafson 26a4d42072
MINOR: Pass snapshot ID directly in `RaftClient.createSnapshot` (#12981)
Let `RaftClient.createSnapshot` take the snapshotId directly instead of the committed offset/epoch (which may not exist). 

Reviewers: José Armando García Sancio <jsancio@apache.org>
2022-12-13 10:44:56 -08:00
Colin Patrick McCabe b2dea17041
MINOR: Introduce MetadataProvenance and ImageReWriter (#12964)
Introduce MetadataProvenance to encapsulate the three-tuple of (offset, epoch, timestamp) that is
associated with each MetadataImage, as well as each on-disk snapshot. Also introduce a builder
for MetadataDelta.

Remove offset and epoch tracking from MetadataDelta. We do not really need to know this information
until we are creating the final MetadataImage object. Therefore, this bookkeeping should be done by
the metadata loading code, not inside the delta code, like the other bookkeeping. This simplifies a
lot of tests, as well as simplifying RecordTestUtils.  It also makes more sense for snapshots, where
the offset and epoch are the same for every record.

Add ImageReWriter, an ImageWriter that applies records to a MetadataDelta. This is useful when you
need to create a MetadataDelta object that holds the contents of a MetadataImage. This will be
used in the new image loader code (coming soon).

Add ImageWriterOptionsTest to test ImageWriterOptions.

Reviewers: David Arthur <mumrah@gmail.com>
2022-12-12 09:52:06 -08:00
Purshotam Chauhan c6590ee28b KAFKA-14435: Fix `allow.everyone.if.no.acl.found` config behavior for StandardAuthorizer
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Colin Patrick McCabe <cmccabe@apache.org>
2022-12-09 00:58:13 +05:30
David Arthur d40561e90a
KAFKA-14427 ZK client support for migrations (#12946)
This patch adds support for reading and writing ZooKeeper metadata during a KIP-866 migration.

For reading metadata from ZK, methods from KafkaZkClient and ZkData are reused to ensure we are decoding the JSON consistently.

For writing metadata, we use a new multi-op transaction that ensures only a single controller is writing to ZK. This is similar to the existing multi-op transaction that KafkaController uses, but it also includes a check on the new "/migration" ZNode. The transaction consists of three operations:

* CheckOp on /controller_epoch
* SetDataOp on /migration with zkVersion
* CreateOp/SetDataOp/DeleteOp (the actual operation being applied)

In the case of a batch of operations (such as topic creation), only the final MultiOp has a SetDataOp on /migration while the other requests use a CheckOp (similar to /controller_epoch).

Reviewers: Colin Patrick McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>
2022-12-08 13:14:01 -05:00