As part of KIP-853, storage-tool.sh now has two new flags: --standalone, and --initial-voters. This PR implements these two flags in storage-tool.sh.
There are currently two valid ways to format a cluster:
The pre-KIP-853 way, where you use a statically configured controller quorum. In this case, neither --standalone nor --initial-voters may be specified, and kraft.version must be set to 0.
The KIP-853 way, where one of --standalone and --initial-voters must be specified with the initial value of the dynamic controller quorum. In this case, kraft.version must be set to 1.
This PR moves the formatting logic out of StorageTool.scala and into Formatter.java. The tool file was never intended to get so huge, or to implement complex logic like generating metadata records. Those things should be done by code in the metadata or raft gradle modules. This is also useful for junit tests, which often need to do formatting. (The 'info' and 'random-uuid' commands remain in StorageTool.scala, for now.)
Reviewers: José Armando García Sancio <jsancio@apache.org>
As discussed in #16657 (comment) , we should make logger as static to avoid creating multiple logger instances.
I use the regex private.*Logger.*LoggerFactory to search and check all the results if certain logs need to be static.
There are some exceptions that loggers don't need to be static:
1) The logger in the inner class. Since java8 doesn't support static field in the inner class.
https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java#L3676
2) Custom loggers for each instance (non-static + non-final). In this case, multiple logger instances is actually really needed.
https://github.com/apache/kafka/blob/trunk/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java#L166
3) The logger is initialized in constructor by LogContext. Many non-static but with final modifier loggers are in this category, that's why I use .*LoggerFactory to only check the loggers that are assigned initial value when declaration.
4) protected final Logger log = Logger.getLogger(getClass())
This is for subclass can do logging with subclass name instead of superclass name.
But in this case, if the log access modifier is private, the purpose cannot be achieved since subclass cannot access the log defined in superclass. So if access modifier is private, we can replace getClass() with <className>.class
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This change includes adding transaction.version (part of KIP-1022)
New transaction version 1 is introduced to support writing flexible fields in transaction state log messages.
Transaction version 2 is created in anticipation for further KIP-890 changes.
Neither are made production ready. Tests for the new transaction version and new MV are created.
Also include change to not report a feature as supported if the range is 0-0.
Reviewers: Jun Rao <junrao@apache.org>, David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Colin P. McCabe <cmccabe@apache.org>
Introduce the KRaftVersion enum to describe the current value of kraft.version. Change a bunch of places in the code that were using raw shorts over to using this new enum.
In BrokerServer.scala, fix a bug that could cause null pointer exceptions during shutdown if we tried to shut down before fully coming up.
Do not send finalized features that are finalized as level 0, since it is a no-op.
Reviewers: dengziming <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@apache.org>
When becoming the active KRaftMigrationDriver, there is another race condition similar to KAFKA-16171. This time, the race is due to a stale read from ZK. After writing to /controller and /controller_epoch, it is possible that a read on /migration is not linearized with the writes that were just made. In other words, we get a stale read on /migration. This leads to an inability to sync metadata to ZK due to incorrect zkVersion on the migration ZNode.
The non-linearizability of reads is in fact documented behavior for ZK, so we need to handle it.
To fix the stale read, this patch adds a write to /migration after updating /controller and /controller_epoch. This allows us to learn the correct zkVersion for the migration ZNode before leaving the BECOME_CONTROLLER state.
This patch also adds a check on the current leader epoch when running certain events in KRaftMigrationDriver. Historically, we did not include this check because it is not necessary for correctness. Writes to ZK are gated on the /controller_epoch zkVersion, and RPCs sent to brokers are gated on the controller epoch. However, during a time of rapid failover, there is a lot of processing happening on the controller (i.e., full metadata sync to ZK and full UMRs sent to brokers), so it is best to avoid running events we know will fail.
There is also a small fix in here to improve the logging of ZK operations. The log message are changed to past tense to reflect the fact that they have already happened by the time the log message is created.
Reviewers: Igor Soarez <soarez@apple.com>
Create 3 new metadata versions:
- 3.8-IV0, for the upcoming 3.8 release.
- 3.9-IV0, to add support for KIP-1005.
- 3.9-IV1, as the new release vehicle for KIP-966.
Create ListOffsetRequest v9, which will be used in 3.9-IV0 to support KIP-1005. v9 is currently an unstable API version.
Reviewers: Jun Rao <junrao@gmail.com>, Justine Olshan <jolshan@confluent.io>
Add field tieredEpoch to RemoteLogSegmentMetadata
Update relevant tests
Add two fields tieredEpoch and tieredState to TopicRecord.json
Reviewers: Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>
* KAFKA-16952: Do not bump broker epoch when re-registering the same incarnation
As part of KIP-858 (Handle JBOD broker disk failure in KRaft), we added some code that caused the
broker to re-register itself when transitioning from a MetadataVersion that did not support broker
directory IDs, to one that did. This code was necessary because otherwise the controller would not
be aware of what directories the broker held.
However, prior to this PR, the re-registration process acted exactly like a full registration. That
is, it bumped the broker epoch (which is meant to only be bumped on broker restart). This PR fixes
the code to keep the broker epoch the same if the incarnation ID is the same.
There are some other minor improvements here:
- The previous logic relied on a complicated combination of request version and previous broker
epoch to understand if the request came from the same broker or not. This is not needed: either
the incarnation ID is the same and it's the same process, or it is not and it isn't.
- We now log whether we're amending a registration, registering a previously unknown broker, or
replacing a previous registration.
- Move changes to the HeartbeatManager to the end of the function, so that we will not do them if
any validation step fails. Log4j messages are also generated at the end, for the same reason.
Reviewers: Ron Dagostino <rndgstn@gmail.com>
When PartitionRegistration#merge() reads a PartitionChangeRecord
from an older MetadataVersion, with a replica assignment change
and without #directories() set, it produces a direcotry assignment
of DirectoryId.UNASSIGNED. This is problematic because the MetadataVersion
may not yet support directory assignments, leading to a
UnwritableMetadataException in PartitionRegistration#toRecord.
Since the Controller always sets directories on PartitionChangeRecord
if the MetadataVersion supports it, via PartitionChangeBuilder,
there's no need for PartitionRegistration#merge() to populate
directories upon a replica assignment change.
Reviewers: Luke Chen <showuon@gmail.com>
This patch fixes a typo in MetadataVersion.IBP_4_0_IV0. It should be 0 not O.
Reviewers: Justine Olshan <jolshan@confluent.io>, Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
When upgrading from a MetadataVersion older than 3.7-IV2, we need to resend the broker registration, so that the controller can record the storage directories. The current code for doing this has several problems, however. One is that it tends to trigger even in cases where we don't actually need it. Another is that when re-registering the broker, the broker is marked as fenced.
This PR moves the handling of the re-registration case out of BrokerMetadataPublisher and into BrokerRegistrationTracker. The re-registration code there will only trigger in the case where the broker sees an existing registration for itself with no directories set. This is much more targetted than the original code.
Additionally, in ClusterControlManager, when re-registering the same broker, we now preserve its fencing and shutdown state, rather than clearing those. (There isn't any good reason re-registering the same broker should clear these things... this was purely an oversight.) Note that we can tell the broker is "the same" because it has the same IncarnationId.
Reviewers: Gaurav Narula <gaurav_narula2@apple.com>, Igor Soarez <soarez@apple.com>
This patch introduces the `group.version` feature flag with one version:
1) Version 1 enables the new consumer group rebalance protocol (KIP-848).
Reviewers: Justine Olshan <jolshan@confluent.io>
As per KIP-1022, we will rename the unstable metadata versions enabled config to support all feature versions.
Features is also updated to return latest production and latest testing versions of each feature.
A feature is production ready when the corresponding metadata version (bootstrapMetadataVersion) is production ready.
Adds tests for the feature usage of the unstableFeatureVersionsEnabled config
Reviewers: David Jacot <djacot@confluent.io>, Jun Rao <junrao@gmail.com>
As part of KIP-1022, I have created an interface for all the new features to be used when parsing the command line arguments, doing validations, getting default versions, etc.
I've also added the --feature flag to the storage tool to show how it will be used.
Created a TestFeatureVersion to show an implementation of the interface (besides MetadataVersion which is unique) and added tests using this new test feature.
I will add the unstable config and tests in a followup.
Reviewers: David Mao <dmao@confluent.io>, David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jun Rao <junrao@apache.org>
Adds support for the KafkaRaftClient to read the control records KRaftVersionRecord and VotersRecord in the snapshot and log. As the control records in the KRaft partition are read, the replica's known set of voters are updated. This change also contains the necessary changes to include the control records when a snapshot is generated by the KRaft state machine.
It is important to note that this commit changes the code and the in-memory state to track the sets of voters but it doesn't change any data that is externally exposed. It doesn't change the RPCs, data stored on disk or configuration.
When the KRaft replica starts the PartitionListener reads the latest snapshot and then log segments up to the LEO, updating the in-memory state as it reads KRaftVersionRecord and VotersRecord. When the replica (leader and follower) appends to the log, the PartitionListener catches up to the new LEO. When the replica truncates the log because of a diverging epoch, the PartitionListener also truncates the in-memory state to the new LEO. When the state machine generate a new snapshot the PartitionListener trims any prefix entries that are not needed. This is all done to minimize the amount of data tracked in-memory and to make sure that it matches the state on disk.
To implement the functionality described above this commit also makes the following changes:
Adds control records for KRaftVersionRecord and VotersRecord. KRaftVersionRecord describes the finalized kraft.version supported by all of the replicas. VotersRecords describes the set of voters at a specific offset.
Changes Kafka's feature version to support 0 as the smallest valid value. This is needed because the default value for kraft.version is 0.
Refactors FileRawSnapshotWriter so that it doesn't directly call the onSnapshotFrozen callback. It adds NotifyingRawSnapshotWriter for calling such callbacks. This reorganization is needed because in this change both the KafkaMetadataLog and the KafkaRaftClient need to react to snapshots getting frozen.
Cleans up KafkaRaftClient's initialization. Removes initialize from RaftClient - this is an implementation detail that doesn't need to be exposed in the interface. Removes RaftConfig.AddressSpec and simplifies the bootstrapping of the static voter's address. The bootstrapping of the address is delayed because of tests. We should be able to simplify this further in future commits.
Update the DumpLogSegment CLI to support the new control records KRaftVersionRecord and VotersRecord.
Fix the RecordsSnapshotReader implementations so that the iterator includes control records. RecordsIterator is extended to support reading the new control records.
Improve the BatchAccumulator implementation to allow multiple control records in one control batch. This is needed so that KRaft can make sure that VotersRecord is included in the same batch as the control record (KRaftVersionRecord) that upgrades the kraft.version to 1.
Add a History interface and default implementation TreeMapHistory. This is used to track all of the sets of voters between the latest snapshot and the LEO. This is needed so that KafkaRaftClient can query for the latest set of voters and so that KafkaRaftClient can include the correct set of voters when the state machine generates a new snapshot at a given offset.
Add a builder pattern for RecordsSnapshotWriter. The new builder pattern also implements including the KRaftVersionRecord and VotersRecord control records in the snapshot as necessary. A KRaftVersionRecord should be appended if the kraft.version is greater than 0 at the snapshot's offset. Similarly, a VotersRecord should be appended to the snapshot with the latest value up to the snapshot's offset.
Reviewers: Jason Gustafson <jason@confluent.io>
Fix a case where we could generate useless PartitionChangeRecords on metadata versions older than
3.6-IV0. This could happen in the case where we had an ISR with only one broker in it, and we were
trying to go down to a fully empty ISR. In this case, PartitionChangeBuilder would block the record
to going down to a fully empty ISR (since that is not valid in these pre-KIP-966 metadata
versions), but it would still emit the record, even though it had no effect.
Reviewers: Igor Soarez <soarez@apple.com>
When running ZK migrating to KRaft process, we encountered an issue that the migrating is hanging and the ZkMigrationState cannot move to MIGRATION state. And it is because the pollEvent didn't retry with the retriable MigrationClientException (ZK client retriable errors) while it should. This PR fixes it and add test. And because of this, the poll event will not poll anymore, which causes the KRaftMigrationDriver hanging.
Reviewers: Luke Chen <showuon@gmail.com>, Igor Soarez<soarez@apple.com>, Akhilesh C <akhileshchg@users.noreply.github.com>
The generic error handler of QuorumController didn't log the exception message for non-fault errors, which includes very useful debugging info.
Reviewers: Igor Soarez <soarez@apple.com>
If a future replica doesn't get promoted, any directory reassignment sent to the controller should be reversed.
The current logic is already addressing the case when a replica hasn't yet been promoted and the controller hasn't yet acknowledged the directory reassignment. However, it doesn't cover the case where the replica does not get promoted due to a directory failure after the controller has acknowledged the reassignment but before the future replica catches up again and is promoted to main replica.
Reviewers: Luke Chen <showuon@gmail.com>
If the broker registers with the same broker epoch as the previous session, it is recognized as a clean shutdown. Otherwise, it is an unclean shutdown. This replica will be removed from any ELR.
Reviewers: Artem Livshits <alivshits@confluent.io>, David Arthur <mumrah@gmail.com>
There were different words for metadata.version like metadata version or metadataVersion. Unify format as metadata.version.
Reviewers: Luke Chen <showuon@gmail.com>
Remove the test constructor for PartitionAssignment and remove the TODO.
Also add KRaftClusterTest.testCreatePartitions to get more coverage for
createPartitions.
Reviewers: David Arthur <mumrah@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Currently, when initializing StandardAuthorizer, it'll wait until all ACL loaded and complete the initialLoadFuture. So, checking logs, we'll see:
2023-12-06 14:07:50,325 INFO [StandardAuthorizer 1] Initialized with 6 acl(s). (org.apache.kafka.metadata.authorizer.StandardAuthorizerData) [kafka-1-metadata-loader-event-handler]
2023-12-06 14:07:50,325 INFO [StandardAuthorizer 1] Completed initial ACL load process. (org.apache.kafka.metadata.authorizer.StandardAuthorizerData) [kafka-1-metadata-loader-event-handler]
But then, when shutting down the node, we will also see this error:
2023-12-06 14:12:32,752 ERROR [StandardAuthorizer 1] Failed to complete initial ACL load process. (org.apache.kafka.metadata.authorizer.StandardAuthorizerData) [kafka-1-metadata-loader-event-handler]
java.util.concurrent.TimeoutException
at kafka.server.metadata.AclPublisher.close(AclPublisher.scala:98)
at org.apache.kafka.image.loader.MetadataLoader.closePublisher(MetadataLoader.java:568)
at org.apache.kafka.image.loader.MetadataLoader.lambda$removeAndClosePublisher$7(MetadataLoader.java:528)
at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
at java.base/java.lang.Thread.run(Thread.java:840)
It's confusing. And it's because we'll try to complete authorizer initialLoad, and complete the initialLoadFuture if not done. But we'll log the error no matter it's completed or not. This patch improves the logging.
Reviewers: Josep Prat <josep.prat@aiven.io>
When completing the partition reassignment, the new ISR should have all the target replicas.
Reviewers: Justine Olshan <jolshan@confluent.io>, David Mao <dmao@confluent.io>
During migration from ZK mode to KRaft mode, there is a step where the kcontrollers load all of the
data from ZK into the metadata log. Previously, we were using a batch size of 1000 for this, but
200 seems better. This PR also adds an internal configuration to control this batch size, for
testing purposes.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This patch causes the active KRaftMigrationDriver to reload the /migration ZK state after electing
itself as the leader in ZK. This closes a race condition where the previous active controller could
make an update to /migration after the new leader was elected. The update race was not actually a
problem regarding the data since both controllers would be syncing the same state from KRaft to ZK,
but the change to the znode causes the new controller to fail on the zk version check on
/migration.
This patch also fixes a as-yet-unseen bug where the active controllers failing to elect itself via
claimControllerLeadership would not retry.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This patch implements the new DescribeTopicPartitions RPC as defined in KIP-966 (ELR). Additionally, this patch adds a broker config "max.request.partition.size.limit" which limits the number of partitions returned by the new RPC.
Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>, David Arthur <mumrah@gmail.com>
This originally was #14489 which covered 2 aspects -- reloading on partition epoch changes where leader epoch did not change and reloading when leader epoch changed but we were already the leader.
I've cut out the second part of the change since the first part is much simpler.
Redefining the TopicDelta fields to better distinguish when a leader is elected (leader epoch bump) vs when a leader has isr/replica changes (partition epoch bump). There are some cases where we bump the partition epoch but not the leader epoch. We do not need to do operations that only care about the leader epoch bump. (ie -- onElect callbacks)
Reviewers: Artem Livshits <alivshits@confluent.io>, José Armando García Sancio <jsancio@apache.org>
This PR creates MetadataVersion.latestTesting to represent the highest metadata version (which may be unstable) and MetadataVersion.latestProduction to represent the latest version that should be used in production. It fixes a few cases where the broker was advertising that it supported the testing versions even when unstable metadata versions had not been configured.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
Any directory changes must be considered when replaying
BrokerRegistrationChangeRecord. This is necessary
to persist directory failures in the cluster metadata,
which #14902 missed.
Reviewers: Omnia G.H Ibrahim <o.g.h.ibrahim@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
To avoid creating lots of small KRaft batches during the ZK migration, this patch adds a mechanism to merge batches into sizes of at least 1000. This has the effect of reducing the number of batches sent to Raft which reduces the amount of time spent blocking.
Since migrations use metadata transactions, the batch boundaries for migrated records are not significant. Even in light of that, this implementation does not break up existing batches. It will only combine them into a larger batch to meet the minimum size.
Reviewers: José Armando García Sancio <jsancio@apache.org>
Moves ELR from MetadataVersion IBP_3_7_IV3 into the new IBP_3_8_IV0 because the ELR feature was not completed before 3.7 reached feature freeze. Leaves IBP_3_7_IV3 empty -- it is a no-op and is not reused for anything. Adds the new MetadataVersion IBP_3_7_IV4 for the FETCH request changes from KIP-951, which were mistakenly never associated with a MetadataVersion. Updates the LATEST_PRODUCTION MetadataVersion to IBP_3_7_IV4 to declare both KRaft JBOD and the KIP-951 changes ready for production use.
Reviewers: Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>, Ron Dagostino <rdagostino@confluent.io>, Ismael Juma <ismael@juma.me.uk>, José Armando García Sancio <jsancio@apache.org>, Justine Olshan <jolshan@confluent.io>
MetadataShell should take an advisory lock on the .lock file of the directory it is reading from.
Add an integration test of this functionality in MetadataShellIntegrationTest.java.
Note: in build.gradle, I had to add some dependencies on server-common's test files in order to use
MockFaultHandler, etc.
MetadataBatchLoader.java: fix a case where a log message was incorrect. The intention was to print
the number equivalent to (offset + index). Instead it was printing the offset, followed by the
index. So if the offset was 100 and the index was 1, 1001 would be printed rather than 101.
Co-authored-by: Igor Soarez <i@soarez.me>
Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@apache.org>
When log directories fail, the broker will send a heartbeat listing the failed directories. This
PR implements processing offline directories in the controller's broker heartbeat handling. We
update broker registrations and generate leadership/ISR changes as necessary.
Co-authored-by: Colin P. McCabe <cmccabe@apache.org>
Reviewers: Ron Dagostino <rndgstn@gmail.com>
Handle AssignReplicasToDirs requests, persist metadata changes
with new directory assignments and possible leader elections.
Reviewers: Proven Provenzano <pprovenzano@confluent.io>, Ron Dagostino <rndgstn@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
BrokerHeartbeatManager should require a call to register(brokerId) before touch(brokerId)
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ron Dagostino <rndgstn@gmail.com>
Add the CurrentControllerId metric as described in KIP-1001. This gives us an easy way to identify the current controller by looking at the metrics of any Kafka node (broker or controller).
Reviewers: David Arthur <mumrah@gmail.com>
When creating partition registrations directories must always be defined.
If creating a partition from a PartitionRecord or PartitionChangeRecord from an older version that
does not support directory assignments, then DirectoryId.MIGRATING is assumed.
If creating a new partition, or triggering a change in assignment, DirectoryId.UNASSIGNED should be
specified, unless the target broker has a single online directory registered, in which case the
replica should be assigned directly to that single directory.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Assign MetadataVersion.IBP_3_7_IV2 to JBOD.
Move KIP-966 support to MetadataVersion.IBP_3_7_IV3.
Create MetadataVersion.LATEST_PRODUCTION as the latest metadata version that can be used when formatting a
new cluster, or upgrading a cluster using kafka-features.sh. This will allow us to clearly distinguish between stable
and unstable metadata versions for the first time.
Reviewers: Igor Soarez <soarez@apple.com>, Ron Dagostino <rndgstn@gmail.com>, Calvin Liu <caliu@confluent.io>, Proven Provenzano <pprovenzano@confluent.io>
There is a race in the assertion on `capturedImages`. Since the future is signaled first, it is still possible to see an empty list. By adding to the collection first, we can ensure the assertion will succeed.
Reviewers: Reviewers: David Jacot <djacot@confluent.io>
The ControllerRegistration records added in KIP-919 should be written out to the metadata
image, not just the log.
Reviewers: José Armando García Sancio <jsancio@apache.org>
The metadata cache now considers registered log directories
and directory assignments when determining offline replicas.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Proven Provenzano <pprovenzano@confluent.io>
A new AssignmentsManager accumulates, batches, and sends KIP-858
assignment events to the Controller. Assignments are sent via
AssignReplicasToDirs requests.
Move QuorumTestHarness.formatDirectories into TestUtils so it can be
used in other test contexts.
Fix a bug in ControllerRegistration.java where the wrong version of the
record was being generated in ControllerRegistration.toRecord.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Proven Provenzano <pprovenzano@confluent.io>, Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>
Update BrokerRegistration to use a Builder. This fixes the proliferation of different constructors,
and makes it clear what arguments are being used where.
Reviewers: Colin P. McCabe <cmccabe@confluent.io>
meta.properties files are used by Kafka to identify log directories within the filesystem.
Previously, the code for handling them was in BrokerMetadataCheckpoint.scala. This PR rewrites the
code for handling them as Java and moves it to the apache.kafka.metadata.properties namespace. It
also gets rid of the separate types for v0 and v1 meta.properties objects. Having separate types
wasn't so bad back when we had a strict rule that zk clusters used v0 and kraft clusters used v1.
But ZK migration has blurred the lines. Now, a zk cluster may have either v0 or v1, if it is
migrating, and a kraft cluster may have either v0 or v1, at any time.
The new code distinguishes between an individual meta.properties file, which is represented by
MetaProperties, and a collection of meta.properties files, which is represented by
MetaPropertiesEnsemble. It is useful to have this distinction, because in JBOD mode, even if some
log directories are inaccessible, we can still use the ensemble to extract needed information like
the cluster ID. (Of course, even when not in JBOD mode, KRaft servers have always been able to
configure a metadata log directory separate from the main log directory.)
Since we recently added a unique directory.id to each meta.properties file, the previous convention
of passing a "canonical" MetaProperties object for the cluster around to various places in the code
needs to be revisited. After all, we can no longer assume all of the meta.properties files are the
same. This PR fixes these parts of the code. For example, it fixes the constructors of
ControllerApis and RaftManager to just take a cluster ID, rather than a MetaProperties object. It
fixes some other parts of the code, like the constructor of SharedServer, to take a
MetaPropertiesEnsemble object.
Another goal of this PR was to centralize meta.properties validation a bit more and make it
unit-testable. For this purpose, the PR adds MetaPropertiesEnsemble.verify, and a few other
verification methods. These enforce invariants like "the metadata directory must be readable," and
so on.
Reviewers: Igor Soarez <soarez@apple.com>, David Arthur <mumrah@gmail.com>, Divij Vaidya <diviv@amazon.com>, Proven Provenzano <pprovenzano@confluent.io>
The patch includes the following changes as part of KIP-966
* Allow ISR shrink to empty
* Allow leader election with ELR members
* Allow electing the last known leader
Reviewers: Artem Livshits <alivshits@confluent.io>, David Arthur <mumrah@gmail.com>
This PR adds resources to store and handle client metrics needed for KIP-714.
Changes include:
Adding CLIENT_METRICS to resource type
Corresponding DYNAMIC client configurations in resources.
Changes to support dynamic loading of configuration on changes.
Changes to support API calls to fetch data stored against the new resource.
Test cases for the changes.
Reviewers: Andrew Schofield <andrew_schofield@uk.ibm.com>, Philip Nee <pnee@confluent.io>, Jun Rao <junrao@gmail.com>
Reviewers: Christo Lolov <lolovc@amazon.com>, Colin P. McCabe <cmccabe@apache.org>, Proven Provenzano <pprovenzano@confluent.io>, Ron Dagostino <rdagostino@confluent.io>
As described in ticket KAFKA-15689, this PR fixes the logging of a migration event when the expected migration state is wrong.
Signed-off-by: Paolo Patierno <ppatierno@live.com>
Reviewers: Luke Chen <showuon@gmail.com>
Trivial PR to use the INFO level (instead of DEBUG) for logging the state transition during the ZooKeeper to KRaft migration.
I think it's a useful information to be logged without the need for the user to increase the logging level itself.
Signed-off-by: Paolo Patierno <ppatierno@live.com>
Reviewers: Luke Chen <showuon@gmail.com>, hudeqi <1217150961@qq.com>
This field was missed by the initial KIP-919 PR(s). The result is that migrations can't begin since
the controllers will never become ready. This patch fixes that as well as pulls over some fixes
from the 3.6 branch.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This patch adds reconciliation logic to migrating ZK brokers to deal with pending topic deletions as well as missed StopReplicas.
During the hybrid mode of the ZK migration, the KRaft controller is asynchronously sending UMR and LISR to the ZK brokers to propagate metadata. Since this process is essentially "best effort" it is possible for a broker to miss a StopReplicas. The new logic lets the ZK broker examine its local logs compared with the full set of replicas in a "Full" LISR. Any local logs which are not present in the set of replicas in the request are removed from ReplicaManager and marked as "stray".
To avoid inadvertent data loss with this new behavior, the brokers do not delete the "stray" partitions. They will rename the directories and log warning messages during log recovery. It will be up to the operator to manually delete the stray partitions. We can possibly enhance this in the future to clean up old stray logs.
This patch makes use of the previously unused Type field on LeaderAndIsrRequest. This was added as part of KIP-516 but never implemented. Since its introduction, an implicit 0 was sent in all LISR. The KRaft controller will now send a value of 2 to indicate a full LISR (as specified by the KIP). The presence of this value acts as a trigger for the ZK broker to perform the log reconciliation.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This patch introduces preliminary changes for Eligible Leader Replicas (KIP-966)
* New MetadataVersion 16 (3.7-IV1)
* New record versions for PartitionRecord and PartitionChangeRecord
* New tagged fields on PartitionRecord and PartitionChangeRecord
* New static config "eligible.leader.replicas.enable" to gate the whole feature
Reviewers: Artem Livshits <alivshits@confluent.io>, David Arthur <mumrah@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
The PR includes:
* Added a new class of CleanShutdownFile which helps write and read from a clean shutdown file.
* Updated the BrokerRegistration API.
* Client side handling for the broker epoch.
* Minimum work on the controller side.
Reviewers: Jun Rao <junrao@gmail.com>
Since only the active controller is performing the dual-write to ZK during a migration, it should be the only controller
to report the ZkWriteBehindLag metric.
Currently, if the controller fails over during a migration, the previous active controller will incorrectly report its last
value for ZkWriteBehindLag forever. Instead, it should report zero.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, David Arthur <mumrah@gmail.com>
In KIP-778 we introduced the "unsafe" (lossy) downgrade in case metadata has changes in one of the versions between target and current, as defined in MetadataVersion.
The documentation says it is possible:
"Note that the cluster metadata version cannot be downgraded to a pre-production 3.0.x, 3.1.x, or 3.2.x version once it has been upgraded. However, it is possible to downgrade to production versions such as 3.3-IV0, 3.3-IV1, etc."
The command line tool shows that this doesn't work:
bin/kafka-features.sh --bootstrap-server :9092 downgrade --metadata 3.4 --unsafe
Could not downgrade metadata.version to 8. Invalid metadata.version 8. Unsafe metadata downgrade is not supported in this version.
1 out of 1 operation(s) failed.
In addition to unsafe, also safe metadata downgrades are not supported in practice. For example, when you upgrade to 3.5, you land on 3.5-IV2 as metadata version, which has metadata changes and won't let you to downgrade. This is true for every other release at the moment.
This change fixes the documentation to reflect that, and improves the error messages.
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Reviewers: Luke Chen <showuon@gmail.com>, Jakub Scholz <github@scholzj.com>
This patch moves the TopicIdPartition from the metadata module to the server-common module so it can be used by the group-coordinator module as well.
Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, David Jacot <djacot@confluent.io>
This is one of the steps required for kafka to compile with Java 21.
For each case, one of the following fixes were applied:
1. Suppress warning if fixing would potentially result in an incompatible change (for public classes)
2. Add final to one or more methods so that the escape is not possible
3. Replace method calls with direct field access.
In addition, we also fix a couple of compiler warnings related to deprecated references in the `core` module.
See the following for more details regarding the new lint warning:
https://www.oracle.com/java/technologies/javase/21-relnote-issues.html#JDK-8015831
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Chris Egerton <chrise@aiven.io>
Endpoint information provided by KafkaConfig can be incomplete in two ways. One is that endpoints
using ephemeral ports will show up as using port 0. Another is that endpoints binding to 0.0.0.0
will show up with a null or blank hostname. Because we were not accounting for this in controller
registration, it was leading to a null pointer dereference when we tried to register a controller
using an endpoint defined as PLAINTEXT://:9092.
This PR adds a ListenerInfo class which can fix both of the causes of incomplete endpoint
information. It also handles serialization to and from various RPC and record formats.
This allows us to remove a lot of boilerplate code and standardize the handling of listeners
between BrokerServer and ControllerServer.
Reviewers: David Arthur <mumrah@gmail.com>
This patch allows broker heartbeat events to be completed while a metadata transaction is in-flight.
More generally, this patch allows any RUNS_IN_PREMIGRATION event to complete while the controller
is in pre-migration mode even if the migration transaction is in-flight.
We had a problem with broker heartbeats timing out because they could not be completed while a large
ZK migration transaction was in-flight. This resulted in the controller fencing all the ZK brokers which
has many undesirable downstream effects.
Reviewers: Akhilesh Chaganti <akhileshchg@users.noreply.github.com>, Colin Patrick McCabe <cmccabe@apache.org>
Implement KIP-919: Allow AdminClient to Talk Directly with the KRaft Controller Quorum and add
Controller Registration. This KIP adds a new version of DescribeClusterRequest which is supported
by KRaft controllers. It also teaches AdminClient how to use this new DESCRIBE_CLUSTER request to
talk directly with the controller quorum. This is all gated behind a new MetadataVersion,
IBP_3_7_IV0.
In order to share the DESCRIBE_CLUSTER logic between broker and controller, this PR factors it out
into AuthHelper.computeDescribeClusterResponse.
The KIP adds three new errors codes: MISMATCHED_ENDPOINT_TYPE, UNSUPPORTED_ENDPOINT_TYPE, and
UNKNOWN_CONTROLLER_ID. The endpoint type errors can be returned from DescribeClusterRequest
On the controller side, the controllers now try to register themselves with the current active
controller, by sending a CONTROLLER_REGISTRATION request. This, in turn, is converted into a
RegisterControllerRecord by the active controller. ClusterImage, ClusterDelta, and all other
associated classes have been upgraded to propagate the new metadata. In the metadata shell, the
cluster directory now contains both broker and controller subdirectories.
QuorumFeatures previously had a reference to the ApiVersions structure used by the controller's
NetworkClient. Because this PR removes that reference, QuorumFeatures now contains only immutable
data. Specifically, it contains the current node ID, the locally supported features, and the list
of quorum node IDs in the cluster.
Reviewers: David Arthur <mumrah@gmail.com>, Ziming Deng <dengziming1993@gmail.com>, Luke Chen <showuon@gmail.com>
A follow-up to https://github.com/apache/kafka/pull/13804.
This follow-up adds the alternative fix approach mentioned in
the PR above - bumping the session timeout used in the test
with 1 second.
Reproducing the flake-out locally has been much harder than
on the CI runs, as neither Gradle with Java 11 or Java 14 nor
IntelliJ with Java 14 could show it, but IntelliJ with Java 11
could occasionally reproduce the failure the first time
immediately after a rebuild. While I was unable to see the
failure with the bumped session timeout, the testing procedure
definitely didn't provide sufficient reassurance for the
fix as even without it often I'd see hundreds of consecutive
successful test runs when the first run didn't fail.
Reviewers: Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>
When starting up a controller for the first time (i.e., with an empty log), it is possible for
MetadataLoader to publish an empty MetadataImage before the activation records of the controller
have been written. While this is not a bug, it could be confusing. This patch closes that gap by
waiting for at least one controller record to be committed before the MetadataLoader starts publishing
images.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
#14083 added support for delegation tokens in KRaft and attached that support to the existing
MetadataVersion 3.6-IV1. This patch moves that support into a separate MetadataVersion 3.6-IV2.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This PR contains three main changes:
- Support for transactions in MetadataLoader
- Abort in-progress transaction during controller failover
- Utilize transactions for ZK to KRaft migration
A new MetadataBatchLoader class is added to decouple the loading of record batches from the
publishing of metadata in MetadataLoader. Since a transaction can span across multiple batches (or
multiple transactions could exist within one batch), some buffering of metadata updates was needed
before publishing out to the MetadataPublishers. MetadataBatchLoader accumulates changes into a
MetadataDelta, and uses a callback to publish to the publishers when needed.
One small oddity with this approach is that since we can "splitting" batches in some cases, the
number of bytes returned in the LogDeltaManifest has new semantics. The number of bytes included in
a batch is now only included in the last metadata update that is published as a result of a batch.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Reviewers: David Arthur <mumrah@gmail.com>, Ron Dagostino <rndgstn@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Viktor Somogyi <viktor.somogyi@cloudera.com>
Implement the QuorumController side of KRaft metadata transactions.
As specified in KIP-868, this PR creates a new metadata version, IBP_3_6_IV1, which contains the
three new records: AbortTransactionRecord, BeginTransactionRecord, EndTransactionRecord.
In order to make offset management unit-testable, this PR moves it out of QuorumController.java and
into OffsetControlManager.java. The general approach here is to track the "last stable offset," which is
calculated by looking at the latest committed offset and the in-progress transaction (if any). When
a transaction is aborted, we revert back to this last stable offset. We also revert back to it when
the controller is transitioning from active to inactive.
In a follow-up PR, we will add support for the transaction records in MetadataLoader. We will also
add support for automatically aborting pending transactions after a controller failover.
Reviewers: David Arthur <mumrah@gmail.com>
On the controller, move publishing acls to the Authorizer into a dedicated MetadataPublisher,
AclPublisher. This publisher listens for notifications from MetadataLoader, and receives only
committed data. This brings the controller side in line with how the broker has always worked. It
also avoids some ugly code related to publishing directly from the QuorumController. Most important
of all, it clears the way to implement metadata transactions without worrying about Authorizer
state (since it will be handled by the MetadataLoader, along with other metadata image state).
In AclsDelta, we can remove isSnapshotDelta. We always know when the MetadataLoader is giving us a
snapshot. Also bring AclsDelta in line with the other delta classes, where completeSnapshot
calculates the diff between the previous image and the next one. We don't use this delta (since we
just apply the image directly to the authorizer) but we should have it, for consistency.
Finally, change MockAclMutator to avoid the need to subclass AclControlManager.
Reviewers: David Arthur <mumrah@gmail.com>
Provide the exact record offset to QuorumController.replay() in all cases. There are several situations
where this is useful, such as logging, implementing metadata transactions, or handling broker
registration records.
In the case where the QC is inactive, and simply replaying records, it is easy to compute the exact
record offset from the batch base offset and the record index.
The active QC case is more difficult. Technically, when we submit records to the Raft layer, it can
choose a batch base offset later than the one we expect, if someone else is also adding records.
While the QC is the only entity submitting data records, control records may be added at any time.
In the current implementation, these are really only used for leadership elections. However, this
could change with the addition of quorum reconfiguration or similar features.
Therefore, this PR allows the QC to tell the Raft layer that a record append should fail if it
would have resulted in a batch base offset other than what was expected. This in turn will trigger a
controller failover. In the future, if automatically added control records become more common, we
may wish to have a more sophisticated system than this simple optimistic concurrency mechanism. But
for now, this will allow us to rely on the offset as correct.
In order that the active QC can learn what offset to start writing at, the PR also adds a new
RaftClient#endOffset function.
At the Raft level, this PR adds a new exception, UnexpectedBaseOffsetException. This gets thrown
when we request a base offset that doesn't match the one the Raft layer would have given us.
Although this exception should cause a failover, it should not be considered a fault. This
complicated the exception handling a bit and motivated splitting more of it out into the new
EventHandlerExceptionInfo class. This will also let us unit test things like slf4j log messages a
bit better.
Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@apache.org>
This patch adds several metrics defined in KIP-866:
* MigratingZkBrokerCount: the number of zk brokers registered with KRaft
* ZkWriteDeltaTimeMs: time spent writing MetadataDelta to ZK
* ZkWriteSnapshotTimeMs: time spent writing MetadataImage to ZK
* Adds value 4 for "ZK" to ZkMigrationState
Also fixes a typo in the metric name introduced in #14009 (ZKWriteBehindLag -> ZkWriteBehindLag)
Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
Implement some of the metrics from KIP-938: Add more metrics for
measuring KRaft performance.
Add these metrics to QuorumControllerMetrics:
kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount
kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount
kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount
kafka.controller:type=KafkaController,name=NewActiveControllersCount
Create LoaderMetrics with these new metrics:
kafka.server:type=MetadataLoader,name=CurrentMetadataVersion
kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount
Create SnapshotEmitterMetrics with these new metrics:
kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes
kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs
Reviewers: Ron Dagostino <rndgstn@gmail.com>
* Adds an exponential backoff to 1m while the controller is waiting for brokers to show up
* Increases one-time logs to INFO
* Adds a summary of the migration records
* Use RecordRedactor for summary of migration batches (TRACE only)
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Standardize controller log4j output for replaying important records. The log message should include
word "replayed" to make it clear that this is a record replay. Log the replay of records for ACLs,
client quotas, and producer IDs, which were previously not logged. Also fix a case where we weren't
logging changes to broker registrations.
AclControlManager, ClientQuotaControlManager, and ProducerIdControlManager didn't previously have a
log4j logger object, so this PR adds one. It also converts them to using Builder objects. This
makes junit tests more readable because we don't need to specify paramaters where the test can use
the default (like LogContexts).
Throw an exception in replay if we get another TopicRecord for a topic which already exists.
Example log messages:
INFO [QuorumController id=3000] Replayed a FeatureLevelRecord setting metadata version to 3.6-IV0
DEBUG [QuorumController id=3000] Replayed a ZkMigrationStateRecord which did not alter the state from NONE.
INFO [QuorumController id=3000] Replayed BrokerRegistrationChangeRecord modifying the registration for broker 0: BrokerRegistrationChangeRecord(brokerId=0, brokerEpoch=3, fenced=-1, inControlledShutdown=0)
INFO [QuorumController id=3000] Replayed ClientQuotaRecord for ClientQuotaEntity(entries={user=testkit}) setting request_percentage to 0.99.
Reviewers: Divij Vaidya <diviv@amazon.com>, Ron Dagostino <rndgstn@gmail.com>, David Arthur <mumrah@gmail.com>
Adds additional testing for the various KRaft *Image classes. For every image that we create we already test that we can get there by applying all the records corresponding to that image written out as a list of records. This patch adds more tests to confirm that we can get to each such final image with intermediate stops at all possible intermediate images along the way.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, David Arthur <mumrah@gmail.com>
Fixes a bug where we don't send UMR and LISR requests in dual-write mode when new partitions are created. Prior to this patch, KRaftMigrationZkWriter was mutating the internal data-structures of TopicDelta which prevented MigrationPropagator from sending UMR and LISR for the changed partitions.
Reviewers: David Arthur <mumrah@gmail.com>
This fixes a regression introduced by the previous KAFKA-15109 commit (d0457f7360 on trunk).
Reviewers: Colin P. McCabe <cmccabe@apache.org>, José Armando García Sancio <jsancio@apache.org>
While in migration mode, the KRaft controller must always bump the leader epoch when shrinking an ISR.
This is required to maintain compatibility with the ZK brokers. Without the epoch bump, the ZK brokers
will ignore the partition state change present in the LeaderAndIsrRequest since it would not contain a new
leader epoch.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This PR expands the scope of ApiVersionManager a bit to include returning the current
MetadataVersion and features that are in effect. This is useful in general because that information
needs to be returned in an ApiVersionsResponse. It also allows us to fix the ApiVersionManager
interface so that all subclasses implement all methods of the interface. Having subclasses that
don't implement some methods is dangerous because they could cause exceptions at runtime in
unexpected scenarios.
On the KRaft controller, we were previously performing a read operation in the QuorumController
thread to get the current metadata version and features. With this PR, we now read a volatile
variable maintained by a separate MetadataVersionContextPublisher object. This will improve
performance and simplify the code. It should not change the guarantees we are providing; in both
the old and new scenarios, we need to be robust against version skew scenarios during updates.
Add a Features class which just has a 3-tuple of metadata version, features, and feature epoch.
Remove MetadataCache.FinalizedFeaturesAndEpoch, since it just duplicates the Features class.
(There are some additional feature-related classes that can be consolidated in in a follow-on PR.)
Create a java class, EndpointReadyFutures, for managing the futures associated with individual
authorizer endpoints. This avoids code duplication between ControllerServer and BrokerServer and
makes this code unit-testable.
Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>, Luke Chen <showuon@gmail.com>
When received LeaderAndIsr request, we'll notify remoteLogManager about this leadership changed to trigger the following workflow. But LeaderAndIsr won't be sent in KRaft mode, instead, the topicDelta will be received.
This PR fixes this issue by getting leader change and follower change from topicDelta, and triggering rlm.onLeadershipChange to notify remote log manager. Adding tests for remote storage enabled cases.
Reviewers: Satish Duggana <satishd@apache.org>
When the KRaft controller removes a replica from the ISR because of the controlled shutdown there is no need for the leader epoch to be increased by the KRaft controller. This is accurate as long as the topic partition leader doesn't add the removed replica back to the ISR.
This change also fixes a bug when computing the HWM. When computing the HWM, replicas that are not eligible to join the ISR but are caught up should not be included in the computation. Otherwise, the HWM will never increase for replica.lag.time.max.ms because the shutting down replica is not sending FETCH request. Without this additional fix PRODUCE requests would timeout if the request timeout is greater than replica.lag.time.max.ms.
Because of the bug above the KRaft controller needs to check the MV to guarantee that all brokers support this bug fix before skipping the leader epoch bump.
Reviewers: David Mao <47232755+splett2@users.noreply.github.com>, Divij Vaidya <diviv@amazon.com>, David Jacot <djacot@confluent.io>
This creates a builder for PartitionRegistration. The motivation for the builder is that the constructor of PartitionRegistration has four arguments all of type int[] which makes it easy to make a mistake when using it.
Reviewers: José Armando García Sancio <jsancio@apache.org>
In this test broker session timeout is configured aggressively low
(to 1 second) so that fencing can happen without much waiting. Then
in the final portion of the test when brokers should not be fenced
heartbeats are sent roughly 2 times in a session timeout window.
However the first time that's done there's other code between
sending the heartbeat and taking the timestamp, and in local tests
that code can take up to 0.5 seconds (1/2 of the session timeout).
That then can result in all brokers being fenced again which would
fail the test.
This change sends a heartbeat just when a timestamp is taken,
which in local tests results flaky failures from 4 out of 50
to 0 out of 50.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
When the active quorum controller encounters an "unexpected" error, such as a NullPointerException,
it currently resigns its leadership. This PR fixes it so that in addition to doing that, it also
increments the metadata error count metric. This will allow us to better track down these errors.
This PR also fixes a minor bug where performing read operations on a standby controller would
result in an unexpected RuntimeException. The bug happened because the standby controller does not
take in-memory snapshots, and read operations were attempting to read from the epoch of the latest
committed offset. The fix is for the standby controller to simply read the latest value of each
data structure. This is always safe, because standby controllers don't contain uncommitted data.
Also, fix a bug where listPartitionReassignments was reading the latest data, rather than data from
the last committed offset.
Reviewers: dengziming <dengziming1993@gmail.com>, David Arthur <mumrah@gmail.com>
Fixed the metadata change events in the Migration component to check correctly for the diff in
existing topic changes and replicate the metadata to the Zookeeper. Also, made the diff check
exhaustive enough to handle the partial writes in Zookeeper when we're try to replicate changes
using a snapshot in the event of Controller failover.
Add migration client and integration tests to verify the change.
Co-authored-by: Akhilesh Chaganti <akhileshchg@users.noreply.github.com>
This patch adds snapshot reconciliation during ZK to KRaft migration. This reconciliation happens whenever a snapshot is loaded by KRaft, or during a controller failover. Prior to this patch, it was possible to miss metadata updates coming from KRaft when dual-writing to ZK.
Internally this adds a new state SYNC_KRAFT_TO_ZK to the KRaftMigrationDriver state machine. The controller passes through this state after the initial ZK migration and each time a controller becomes active.
Logging during dual-write was enhanced to include a count of write operations happening.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
…tryElection()
A CPU profile in a large cluster showed PartitionChangeBuilder.tryElection() taking significant CPU due to logging. We adjust the logging statements in that method for clean elections from DEBUG level to TRACE to mitigate the impact of this logging under normal operations. Unclean elections are now logged at the INFO level rather than DEBUG.
Reviewers: Jason Gustafson <jason@confluent.io>, Colin P. McCabe <cmccabe@apache.org>
This patch fixes the case where a ClientQuota or SCRAM credential was added in KRaft, but not written back to ZK. This missed write only occurred when handling a KRaft snapshot. If the changed quota was processed in a metadata delta (which is the typical case), it would be written to ZK.
Reviewers: David Arthur <mumrah@gmail.com>
When the active KRaft controller is overloaded, it will not be able to process broker heartbeat
requests. Instead, they will be timed out. When using the default configuration, this will happen
if the time needed to process a broker heartbeat climbs above a second for a sustained period.
This, in turn, could lead to brokers being improperly fenced when they are still alive.
With this PR, timed out heartbeats will still update the lastContactNs and metadataOffset of the
broker in the BrokerHeartbeatManager. While we don't generate any records, this should still be
adequate to prevent spurious fencing. We also log a message at ERROR level so that this condition
will be more obvious.
Other small changes in this PR: fix grammar issue in log4j of BrokerHeartbeatManager. Add JavaDoc
for ClusterControlManager#zkMigrationEnabled field. Add builder for ReplicationControlTestContext
to avoid having tons of constructors. Update ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS to
match the default in KafkaConfig.
Reviewers: Ismael Juma <ijuma@apache.org>, Ron Dagostino <rdagostino@confluent.io>
This patch fixes several small bugs with configuration dual-write during migration.
* Topic configs are not written back to ZK while handling snapshot.
* New broker/topic configs in KRaft that did not exist in ZK will not be written to ZK.
* The sensitive configs are not encoded while writing them to Zookeeper.
* Handle topic configs in ConfigMigrationClient and KRaftMigrationZkWriter#handleConfigsSnapshot
Added tests to ensure we no longer have the above mentioned issues.
Co-authored-by: Akhilesh Chaganti <akhileshchg@users.noreply.github.com>
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Previously, if a user tried to perform an overly large batch operation on the KRaft controller
(such as creating a million topics), we would create a very large number of records in memory. Our
attempt to write these records to the Raft layer would fail, because there were too many to fit in
an atomic batch. This failure, in turn, would trigger a controller failover.
(Note: I am assuming here that no topic creation policy was in place that would prevent the
creation of a million topics. I am also assuming that the user operation must be done atomically,
which is true for all current user operations, since we have not implemented KIP-868 yet.)
With this PR, we fail immediately when the number of records we have generated exceeds the
threshold that we can apply. This failure does not generate a controller failover. We also now
fail with a PolicyViolationException rather than an UnknownServerException.
In order to implement this in a simple way, this PR adds the BoundedList class, which wraps any
list and adds a maximum length. Attempts to grow the list beyond this length cause an exception to
be thrown.
Reviewers: David Arthur <mumrah@gmail.com>, Ismael Juma <ijuma@apache.org>, Divij Vaidya <diviv@amazon.com>
The local variable processedRecordsSize as just left over from another commit and can be safely removed.
Reviewers: Divij Vaidya <diviv@amazon.com> , José Armando García Sancio <jsancio@apache.org>
Fixed a bug during dual write mode where if a user is updating SCRAM records and has no quotas, the SCRAM records will not be written to ZK. Add tests explicitly for this scenario.
Reviewers: David Arthur <mumrah@gmail.com>
Metadata image classes such as MetadataImage, ClusterImage, FeaturesImage, and so forth contain
numerous sub-images. This PR adds a structured way of traversing those sub-images. This is useful
for the metadata shell, and also for implementing toString functions.
In both cases, the previous solution was suboptimal. The metadata shell was previously implemented
in an ad-hoc way by mutating text-based tree nodes when records were replayed. This was difficult
to keep in sync with changes to the record types (for example, we forgot to do this for SCRAM). It
was also pretty low-level, being done at a level below that of the image classes. For toString, it
was difficult to keep the implementations consistent previously, and also support both redacted and
non-redacted output.
The metadata shell directory was getting crowded since we never had submodules for it. This PR
creates glob/, command/, node/, and state/ directories to keep things better organized.
Reviewers: David Arthur <mumrah@gmail.com>, Ron Dagostino <rdagostino@confluent.io>
When loading a snapshot during dual-write mode, we were missing the logic to detect new ACLs that
had been added on the KRaft side. This patch adds support for finding those new ACLs as well as tests
to verify the correct behavior.
Reviewers: David Arthur <mumrah@gmail.com>
Use the MetadataVersion from the MetadataImage passed to MetadataPropagator. The ensures the propagator
sends the right versions of UMR, LISR and StopReplica requests when the migration is in DUAL_WRITE mode.
Reviewers: David Arthur <mumrah@gmail.com>
Rename handleSnapshot to handleLoadSnapshot to make it explicit that it is handling snapshot load,
not generation.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
1. add hint in switch item "BROKER_LOGGER" in ConfigResourceExistenceChecker, otherwise, it will be classified as default break and deleted directly. I don’t know if adding hint is better than deleting directly.
2. delete some unused variables and methods.
3. add the "@test" mark to a method in unit test that is forgotten.
Reviewers: dengziming <dengziming1993@gmail.com>
The KRaft controller return empty finalized features in `ApiVersionResponse`, the brokers are not infected by this, so this problem doesn't have any impact currently, but it's worth fixing it to avoid unexpected problems.
And there is a bunch of of confusing methods in `ApiVersionResponse` which are only used in test code, I moved them to TestUtils to make the code more clear, and force everyone to pass in the correct parameters instead of the default zero parameters, for example, empty supported features and empty finalized features.
Reviewers: Luke Chen <showuon@gmail.com>
This patch adds support for handling metadata snapshots while in dual-write mode. Prior to this change, if the active
controller loaded a snapshot, it would get out of sync with the ZK state.
In order to reconcile the snapshot state with ZK, several methods were added to scan through the metadata in ZK to
compute differences with the MetadataImage. Since this introduced a lot of code, I opted to split out a lot of methods
from ZkMigrationClient into their own client interfaces, such as TopicMigrationClient, ConfigMigrationClient, and
AclMigrationClient. Each of these has some iterator method that lets the caller examine the ZK state in a single pass
and without using too much memory.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Luke Chen <showuon@gmail.com>
When creating the QuorumController, log whether ZK migration is enabled.
When applying a feature level record which sets the metadata version, log the metadata version enum
rather than the numeric feature level.
Improve the logging when we replay snapshots in QuorumController. Log both the beginning and the
end of replay.
When TRACE is enabled, log every record that is replayed in QuorumController. Since some records
may contain sensitive information, create RecordRedactor to assist in logging only what is safe to
put in the log4j file.
Add logging to ControllerPurgatory. Successful completions are logged at DEBUG; failures are logged
at INFO, and additions are logged at TRACE.
Remove SnapshotReason.java, SnapshotReasonTest.java, and
QuorumController#generateSnapshotScheduled. They are deadcode now that snapshot generation moved to
org.apache.kafka.image.publisher.SnapshotGenerator.
Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@apache.org>
Handle migrating SCRAM records in ZK when migrating from ZK to KRaft.
This includes handling writing back SCRAM records to ZK while in dual write mode where metadata updates are written to both the KRaft metadata log and to ZK. This allows for rollback of migration to include SCRAM metadata changes.
Reviewers: David Arthur <mumrah@gmail.com>
1. add ZkMigrationReady in apiVersionsResponse
2. check all nodes if ZkMigrationReady are ready before moving to next migration state
Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
Remove a spurious call to fatalFaultHandler accidentally introduced by KAFKA-14805. We should only
invoke the fatal fault handller if we are unable to generate the activation records. If we are
unable to write the activation records, a controller failover should be sufficient to remedy the
situation.
Co-authored-by: Luke Chen showuon@gmail.com
Reviewers: Luke Chen <showuon@gmail.com>, David Arthur <mumrah@gmail.com>
Don't allow setting negative or zero values for quotas. Don't allow SCRAM mechanism names to be
used as client quota names. SCRAM mechanisms are not client quotas. (The confusion arose because of
internal ZK representation details that treated them both as "client configs.")
Add unit tests for ClientQuotaControlManager.isValidIpEntity and
ClientQuotaControlManager.configKeysForEntityType.
This change doesn't affect metadata record application, only input validation. If there are bad
client quotas that are set currently, this change will not alter the current behavior (of throwing
an exception and ignoring the bad quota).
This patch adds the concept of pre-migration mode to the KRaft controller. While in this mode,
the controller will only allow certain write operations. The purpose of this is to disallow metadata
changes when the controller is waiting for the ZK migration records to be committed.
The following ControllerWriteEvent operations are permitted in pre-migration mode
* completeActivation
* maybeFenceReplicas
* writeNoOpRecord
* processBrokerHeartbeat
* registerBroker (only for migrating ZK brokers)
* unregisterBroker
Raft events and other controller events do not follow the same code path as ControllerWriteEvent,
so they are not affected by this new behavior.
This patch also add a new metric as defined in KIP-868: kafka.controller:type=KafkaController,name=ZkMigrationState
In order to support upgrades from 3.4.0, this patch also redefines the enum value of value 1 to mean
MIGRATION rather than PRE_MIGRATION.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
This patch renames from `ControllerPurgatory` to `DeferredEventQueue` and moves it from the `metadata` module to `server-common` module.
Reviewers: Alexandre Dupriez <alexandre.dupriez@gmail.com>, Ziming Deng <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@apache.org>
Currently, StandardAuthorizer uses a R/W lock for maintaining the consistency of data. For the clusters with very high traffic, we will typically see an increase in latencies whenever a write operation comes. The intent of this PR is to get rid of the R/W lock with the help of immutable or persistent collections. Basically, new object references are used to hold the intermediate state of the write operation. After the completion of the operation, the main reference to the cache is changed to point to the new object. Also, for the read operation, the code is changed such that all accesses to the cache for a single read operation are done to a particular cache object only.
In the PR description, you can find the performance of various libraries at the time of both read and write. Read performance is checked with the existing AuthorizerBenchmark. For write performance, a new AuthorizerUpdateBenchmark has been added which evaluates the performance of the addAcl operation.
Reviewers: Ron Dagostino <rndgstn@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Divij Vaidya <diviv@amazon.com>
Rework UserScramCredentialRecord to store serverKey and StoredKey rather than saltedPassword. This
is necessary to support migration from ZK, since those are the fields we stored in ZK. Update
latest MetadataVersion to IBP_3_5_IV2 and make SCRAM support conditional on this version. Moved
ScramCredentialData.java from org.apache.kafka.image to org.apache.kafka.metadata, which seems more
appropriate.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
topic counts.
Introduces the use of persistent data structures in the KRaft metadata image to avoid copying the entire TopicsImage upon every change. Performance that was O(<number of topics in the cluster>) is now O(<number of topics changing>), which has dramatic time and GC improvements for the most common topic-related metadata events. We abstract away the chosen underlying persistent collection library via ImmutableMap<> and ImmutableSet<> interfaces and static factory methods.
Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>, Purshotam Chauhan <pchauhan@confluent.io>
Although KAFKA-14808 did not affect KRaft mode, it is important to ensure that we have regression
tests in KRaft mode to prevent a similar bug from appearing there in the future. This PR adds two
tests. First, it adds a test that makes sure we handle what happens when a reassignment completes
and none of the new replicas can be made leader. It's important that we dont keep an old replica as
leader. Second, it adds a test that makes sure we handle new reassignments that don't include a
previous assignment replica that was leader.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
The MetadataLoader must call finishSnapshot after loading a snapshot. This function removes
whatever was in the old snapshot that is not in the new snapshot that was just loaded. While this
is not significant when the old snapshot was the empty snapshot, it is important to do when we are
loading a snapshot on top of an existing non-empty image.
In initializeNewPublishers, the newly installed publishers should be given a MetadataDelta based on
MetadataImage.EMPTY, reflecting the fact that they are seeing everything for the first time.
Reviewers: David Arthur <mumrah@gmail.com>
The SnapshotReader exposes the "last contained log time". This is mainly used during snapshot cleanup. The previous implementation used the append time of the snapshot record. This is not accurate as this is the time when the snapshot was created and not the log append time of the last record included in the snapshot.
The log append time of the last record included in the snapshot is store in the header control record of the snapshot. The header control record is the first record of the snapshot.
To be able to read this record, this change extends the RecordsIterator to decode and expose the control records in the Records type.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
This is a small patch to make it so we only create one FeatureControlManager instance in ReplicationControlManagerTest. Currently we create two, which isn't needed. Its also a bit confusing because the ReplicationControlTestContext objects ends up having a different FeatureControlManager reference that the one its own ReplicationControlManager instance has a reference to.
Reviewers: José Armando García Sancio <jsancio@apache.org>, dengziming <dengziming1993@gmail.com>
The MetadataLoader is not supposed to publish metadata updates until we have loaded up to the high
water mark. Previously, this logic was broken, and we published updates immediately. This PR fixes
that and adds a junit test.
Another issue is that the MetadataLoader previously assumed that we would periodically get
callbacks from the Raft layer even if nothing had happened. We relied on this to install new
publishers in a timely fashion, for example. However, in older MetadataVersions that don't include
NoOpRecord, this is not a safe assumption.
Aside from the above changes, also fix a deadlock in SnapshotGeneratorTest, fix the log prefix for
BrokerLifecycleManager, and remove metadata publishers on brokerserver shutdown (like we do for
controllers).
Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
Currently, we have various bits of reassignment logic spread across different classes. For example, ReplicationControlManager contains logic for when a reassignment is in progress, which is duplication in PartitionChangeBuilder. Another example is PartitionReassignmentRevert which contains logic for how to undo/revert a reassignment. The idea here is to move the logic to PartitionReassignmentReplicas so it's more testable and easier to reason about.
Reviewers: José Armando García Sancio <jsancio@apache.org>
This patch refactors the loadCache method in AclAuthorizer to make it reusable by ZkMigrationClient.
The loaded ACLs are converted to AccessControlEntryRecord. I noticed we still have the defunct
AccessControlRecord, so I've deleted it.
Also included here are the methods to write ACL changes back to ZK while in dual-write mode.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Colin P. McCabe <cmccabe@apache.org>
Separate out KRaft controller metrics into two groups: metrics directly managed by the
QuorumController, and metrics handled by an external publisher. This separation of concerns makes
the code easier to reason about, by clarifying what metrics can be changed where.
The external publisher, ControllerServerMetricsPublisher, handles all metrics which are related to
the content of metadata. For example, metrics about number of topics or number of partitions, etc.
etc. It fits into the MetadataLoader metadata publishing framework as another publisher. Since
ControllerServerMetricsPublisher operates off of a MetadataImage, we don't have to create
(essentially) another copy of the metadata in memory, as ControllerMetricsManager. This reduces
memory consumption. Another benefit of operating off of the MetadataImage is that we don't have to
have special handling for each record type, like we do now in ControllerMetricsManager.
Reviewers: David Arthur <mumrah@gmail.com>
Updates ReplicationControlManager and PartitionReassignmentReplicas to use PartitionAssignment.
Reviewers: José Armando García Sancio <jsancio@apache.org>
Standardize KRaft thread names.
- Always use kebab case. That is, "my-thread-name".
- Thread prefixes are just strings, not Option[String] or Optional<String>.
If you don't want a prefix, use the empty string.
- Thread prefixes end in a dash (except the empty prefix). Then you can
calculate thread names as $prefix + "my-thread-name"
- Broker-only components get "broker-$id-" as a thread name prefix. For example, "broker-1-"
- Controller-only components get "controller-$id-" as a thread name prefix. For example, "controller-1-"
- Shared components get "kafka-$id-" as a thread name prefix. For example, "kafka-0-"
- Always pass a prefix to KafkaEventQueue, so that threads have names like
"broker-0-metadata-loader-event-handler" rather than "event-handler". Prior to this PR, we had
several threads just named "EventHandler" which was not helpful for debugging.
- QuorumController thread name is "quorum-controller-123-event-handler"
- Don't set a thread prefix for replication threads started by ReplicaManager. They run only on the
broker, and already include the broker ID.
Standardize KRaft slf4j log prefixes.
- Names should be of the form "[ComponentName id=$id] ". So for a ControllerServer with ID 123, we
will have "[ControllerServer id=123] "
- For the QuorumController class, use the prefix "[QuorumController id=$id] " rather than
"[Controller <nodeId] ", to make it clearer that this is a KRaft controller.
- In BrokerLifecycleManager, add isZkBroker=true to the log prefix for the migration case.
Standardize KRaft terminology.
- All synonyms of combined mode (colocated, coresident, etc.) should be replaced by "combined"
- All synonyms of isolated mode (remote, non-colocated, distributed, etc.) should be replaced by
"isolated".
This patch fixes many small issues to improve error handling and logging during the ZK migration. A test was added
to simulate a ZK session expiration to ensure the correctness of the migration driver.
With this change, ZK errors thrown during the migration will not hit the fault handler registered with with
KRaftMigrationDriver, but they will be logged.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This patch is the first part of KIP-903. It updates the FetchRequest to include the new tagged ReplicaState field which replaces the now deprecated ReplicaId field. The FetchRequest version is bumped to version 15 and the MetadataVersion to 3.5-IV1.
Reviewers: David Jacot <djacot@confluent.io>
This PR refactors MetadataPublisher's interface a bit. There is now an onControllerChange
callback. This is something that some publishers might want. A good example is ZkMigrationClient.
Instead of two different publish functions (one for snapshots, one for log deltas), we now have a single onMetadataUpdate function. Most publishers didn't want to do anything different in those two cases.
The ones that do want to do something different for snapshots can always check the manifest type.
The close function now has a default empty implementation, since most publishers didn't need to do
anything there.
Move the SCRAM logic out of BrokerMetadataPublisher and run it on the controller as well.
On the broker, simply use dynamicClientQuotaPublisher to handle dynamic client quotas changes.
That is what the controller already does, and the code is exactly the same in both cases.
Fix the logging in FutureUtils.waitWithLogging a bit. Previously, when invoked from BrokerServer
or ControllerServer, it did not include the standard "[Controller 123] " style prefix indicating server
name and ID. This was confusing, especially when debugging junit tests.
Reviewers: Ron Dagostino <rdagostino@confluent.io>, David Arthur <mumrah@gmail.com>
Implement KIP-599 controller mutation quotas for the KRaft controller. These quotas apply to create
topics, create partitions, and delete topic operations. They are specified in terms of number of
partitions.
The approach taken here is to reuse the ControllerMutationQuotaManager that is also used in ZK
mode. The quotas are implemented as Sensor objects and Sensor.checkQuotas enforces the quota,
whereas Sensor.record notes that new partitions have been modified. While ControllerApis handles
fetching the Sensor objects, we must make the final callback to check the quotas from within
QuorumController. The reason is because only QuorumController knows the final number of partitions
that must be modified. (As one example, up-to-date information about the number of partitions that
will be deleted when a topic is deleted is really only available in QuorumController.)
For quota enforcement, the logic is already in place. The KRaft controller is expected to set the
throttle time in the response that is embedded in EnvelopeResponse, but it does not actually apply
the throttle because there is no client connection to throttle. Instead, the broker that forwarded
the request is expected to return the throttle value from the controller and to throttle the client
connection. It also applies its own request quota, so the enforced/returned quota is the maximum of
the two.
This PR also installs a DynamicConfigPublisher in ControllerServer. This allows dynamic
configurations to be published on the controller. Previously, they could be set, but they were not
applied. Note that we still don't have a good way to set node-level configurations for isolatied
controllers. However, this will allow us to set cluster configs (aka default node configs) and have
them take effect on the controllers.
In a similar vein, this PR separates out the dynamic client quota publisher logic used on the
broker into DynamicClientQuotaPublisher. We can now install this on both BrokerServer and
ControllerServer. This makes dynamically configuring quotas (such as controller mutation quotas)
possible.
Also add a ducktape test, controller_mutation_quota_test.py.
Reviewers: David Jacot <djacot@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Colin P. McCabe <cmccabe@apache.org>
This commit adds support to store the SCRAM credentials in a cluster with KRaft quorum servers and
no ZK cluster backing the metadata. This includes creating ScramControlManager in the controller,
and adding support for SCRAM to MetadataImage and MetadataDelta.
Change UserScramCredentialRecord to contain only a single tuple (name, mechanism, salt, pw, iter)
rather than a mapping between name and a list. This will avoid creating an excessively large record
if a single user has many entries. Because record ID 11 (UserScramCredentialRecord) has not been
used before, this is a compatible change. SCRAM will be supported in 3.5-IV0 and later.
This commit does not include KIP-900 SCRAM bootstrapping support, or updating the credential cache
on the controller (as opposed to broker). We will implement these in follow-on commits.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
Some minor improvements to the JavaDoc for ZkMigrationState.
Rename MigrationState to MigrationDriverState to avoid confusion with ZkMigrationState.
Remove ClusterImage#zkBrokers. This costs O(num_brokers) time to calculate, but is only ever used
when in migration state. It should just be calculated in the migration code. (Additionally, the
function ClusterImage.zkBrokers() returns something other than ClusterImage#zkBrokers, which is
confusing.)
Also remove ClusterDelta#liveZkBrokerIdChanges. This is only used in one place, and it's easy to
calculate it there. In general we should avoid providing expensive accessors unless absolutely
necessary. Expensive code should look expensive: if people want to iterate over all brokers, they
can write a loop to do that rather than hiding it inside an accessor.
Added the following checks -
* In StandardAuthorizerData.authorize() to fail if `patternType` other than `LITERAL` is passed.
* In AclControlManager.addAcl() to fail if Resource Name is null or empty.
Also, updated `AclAuthorizerTest` includes a lot of tests covering various scenarios that are missing in `StandardAuthorizerTest`. This PR changes the AclAuthorizerTest to run tests for both `zk` and `kraft` modes -
* Rename AclAuthorizerTest -> AuthorizerTest
* Parameterize relevant tests to run for both modes
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
* assertEquals called on array
* Method is identical to its super method
* Simplifiable assertions
* Unused imports
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>
Only send UMR to ZK brokers if the cluster metadata or topic metadata has changed.
Reviewers: Akhilesh C <akhileshchg@users.noreply.github.com>, Colin P. McCabe <cmccabe@apache.org>
The KIP-405 MetadataVersion changes will be released as part of AK 3.5, but were added as BP_3_4_IV1.
This change fixes them to be IBP_3_5_IV0. There is no incompatibility because this feature has not yet
been released. Also set didMetadataChange to false because KRaft metadata log records did not change.
Reviewers: Satish Duggana <satishd@apache.org>, Christo Lolov <christo_lolov@yahoo.com>, Colin P. McCabe <cmccabe@apache.org>
When in migration-from-ZK mode and sending RPCs to ZK-based brokers, the KRaft controller must send
full UpdateMetadataRequests prior to sending full LeaderAndIsrRequests. If the controller sends the
requests in the other order, and the ZK-based broker does not already know about some of the nodes
referenced in the LeaderAndIsrRequest, it will reject the request.
This PR includes an integration test, and a number of other small fixes for dual-write.
Co-authored-by: Akhilesh C <akhileshchg@users.noreply.github.com>
Reviewers: Colin P. McCabe <cmccabe@apache.org>
The KRaft client expects the offset of the snapshot id to be an end offset. End offsets are
exclusive. The MetadataProvenance type was createing a snapshot id using the last contained offset
which is inclusive. This change fixes that and renames some of the fields to make this difference
more obvious.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
### JIRA
https://issues.apache.org/jira/browse/KAFKA-14612
### Details
Makes sure we emit `ConfigRecord`s for a topic iff it actually gets created. Currently, we might emit `ConfigRecord`s even if the topic creation fails later in the `createTopics` method.
I created a new method `incrementalAlterConfig` in `ConfigurationControlManager` that is similar to `incrementalAlterConfig` but it just handles one config at a time. This is used in `ReplicationControlManager` for each topic. By handling one topic's config at a time, it's easier to isolate each topic's config records. This enables us to make sure we only write config records for topics that get created.
I refactored `incrementalAlterConfigResource` to return an `ApiError`. This made it easier to implement the new method `incrementalAlterConfig` in `ConfigurationControlManager` because it then doesnt have to search in the `Map` for the result.
### Testing
Enhanced pre-existing test `ReplicationControlManagerTest.testCreateTopicsWithConfigs`. I ran the tests without the changes to `ReplicationControlManager` and made sure each assertion ends up failing. Also ran `./gradlew metadata:test --tests org.apache.kafka.controller.ReplicationControlManagerTest`.
Reviewers: Jason Gustafson <jason@confluent.io>
If KafkaEventQueue gets an InterruptedException while waiting for a condition variable, it
currently exits immediately. Instead, it should complete the remaining events exceptionally and
then execute the cleanup event. This will allow us to finish any necessary cleanup steps.
In order to do this, we require the cleanup event to be provided when the queue is contructed,
rather than when it's being shut down.
Also, handle cases where Event#handleException itself throws an exception.
Remove timed shutdown from the event queue code since nobody was using it, and it adds complexity.
Add server-common/src/test/resources/test/log4j.properties since this gradle module somehow avoided
having a test log4j.properties up to this point.
Reviewers: David Arthur <mumrah@gmail.com>
With the new broker epoch validation logic introduced in #12998, we no longer need the ZK broker epoch to be sent to the KRaft controller. This patch removes that epoch and replaces it with a boolean.
Another small fix is included in this patch for controlled shutdown in migration mode. Previously, if a ZK broker was in migration mode, it would always try to do controlled shutdown via BrokerLifecycleManager. Since there is no ordering dependency between bringing up ZK brokers and the KRaft quorum during migration, a ZK broker could be running in migration mode, but talking to a ZK controller. A small check was added to see if the current controller is ZK or KRaft before decided which controlled shutdown to attempt.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Implements `toString` method for classes `TopicAssignment` and` PartitionAssignment`. Also removes the `final` keyword from the constructor arguments for consistency.
Reviewers: José Armando García Sancio <jsancio@apache.org>
This patch introduces a preliminary state machine that can be used by KRaft
controller to drive online migration from Zk to KRaft.
MigrationState -- Defines the states we can have while migration from Zk to
KRaft.
KRaftMigrationDriver -- Defines the state transitions, and events to handle
actions like controller change, metadata change, broker change and have
interfaces through which it claims Zk controllership, performs zk writes and
sends RPCs to ZkBrokers.
MigrationClient -- Interface that defines the functions used to claim and
relinquish Zk controllership, read to and write from Zk.
Co-authored-by: David Arthur <mumrah@gmail.com>
Reviewers: Colin P. McCabe <cmccabe@apache.org>
In KRaft, when controller failed to handle events, we'll log error and return back to brokers. But in some cases, we only log error class name, and return error class name back to brokers, which is un-useful for troubleshooting. Ex: When broker registration failed with unsupported version error, it showed:
2022-12-28T17:46:42.876+0800 [DEBUG] [TestEventLogger] [2022-12-28 17:46:42,877] INFO [Controller 3000] registerBroker: failed with UnsupportedVersionException in 2888 us (org.apache.kafka.controller.QuorumController:447)
2022-12-28T17:46:42.877+0800 [DEBUG] [TestEventLogger] [2022-12-28 17:46:42,878] INFO [BrokerLifecycleManager id=0] Unable to register broker 0 because the controller returned error UNSUPPORTED_VERSION (kafka.server.BrokerLifecycleManager:66)
Checking the logs, we still don't know which version it supports.
After this PR, it will show:
2022-12-28T17:54:59.671+0800 [DEBUG] [TestEventLogger] [2022-12-28 17:54:59,671] INFO [Controller 3000] registerBroker: failed with UnsupportedVersionException in 291 us. Reason: Unable to register because the broker does not support version 8 of metadata.version. It wants a version between 4 and 4, inclusive. (org.apache.kafka.controller.QuorumController:447)
2022-12-28T17:54:59.671+0800 [DEBUG] [TestEventLogger] [2022-12-28 17:54:59,672] INFO [BrokerLifecycleManager id=0] Unable to register broker 0 because the controller returned error UNSUPPORTED_VERSION (kafka.server.BrokerLifecycleManager:66)
Reviewers: dengziming <dengziming1993@gmail.com>, Federico Valeri <fvaleri@redhat.com >
Add infrastructure for sending UpdateMetadataRequest and LeaderAndIsr RPCs during the migration
process from ZK to KRaft. The new classes use ControllerChannelManager to send the RPCs. The
information to send comes from MetadataDelta and MetadataImage.
Reviewers: David Arthur <mumrah@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
Additional notable changes to fix multiple dependency ordering issues:
* Moved `ConfigSynonym` to `server-common`
* Moved synonyms from `LogConfig` to `ServerTopicConfigSynonyms `
* Removed `LogConfigDef` `define` overrides and rely on
`ServerTopicConfigSynonyms` instead.
* Moved `LogConfig.extractLogConfigMap` to `KafkaConfig`
* Consolidated relevant defaults from `KafkaConfig`/`LogConfig` in the latter
* Consolidate relevant config name definitions in `TopicConfig`
* Move `ThrottledReplicaListValidator` to `storage`
Reviewers: Satish Duggana <satishd@apache.org>, Mickael Maison <mickael.maison@gmail.com>
The controller metrics in the controllers has three problems. 1) the active controller exposes uncommitted data in the metrics. 2) the active controller doesn't update the metrics when the uncommitted data gets aborted. 3) the controller doesn't update the metrics when the entire state gets reset.
We fix these issues by only updating the metrics when processing committed metadata records and reset the metrics when the metadata state is reset.
This change adds a new type `ControllerMetricsManager` which processes committed metadata records and updates the metrics accordingly. This change also removes metrics updating responsibilities from the rest of the controller managers.
Reviewers: Ron Dagostino <rdagostino@confluent.io>
This PR implements the follower fetch protocol as mentioned in KIP-405.
Added a new version for ListOffsets protocol to receive local log start offset on the leader replica. This is used by follower replicas to find the local log star offset on the leader.
Added a new version for FetchRequest protocol to receive OffsetMovedToTieredStorageException error. This is part of the enhanced fetch protocol as described in KIP-405.
We introduced a new field locaLogStartOffset to maintain the log start offset in the local logs. Existing logStartOffset will continue to be the log start offset of the effective log that includes the segments in remote storage.
When a follower receives OffsetMovedToTieredStorage, then it tries to build the required state from the leader and remote storage so that it can be ready to move to fetch state.
Introduced RemoteLogManager which is responsible for
initializing RemoteStorageManager and RemoteLogMetadataManager instances.
receives any leader and follower replica events and partition stop events and act on them
also provides APIs to fetch indexes, metadata about remote log segments.
Followup PRs will add more functionality like copying segments to tiered storage, retention checks to clean local and remote log segments. This will change the local log start offset and make sure the follower fetch protocol works fine for several cases.
You can look at the detailed protocol changes in KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-FollowerReplication
Co-authors: satishd@apache.org, kamal.chandraprakash@gmail.com, yingz@uber.com
Reviewers: Kowshik Prakasam <kprakasam@confluent.io>, Cong Ding <cong@ccding.com>, Tirtha Chatterjee <tirtha.p.chatterjee@gmail.com>, Yaodong Yang <yangyaodong88@gmail.com>, Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Jun Rao <junrao@gmail.com>
This PR introduces the new metadata loader and snapshot generator. For the time being, they are
only used by the controller, but a PR for the broker will come soon.
The new metadata loader supports adding and removing publishers dynamically. (In contrast, the old
loader only supported adding a single publisher.) It also passes along more information about each
new image that is published. This information can be found in the LogDeltaManifest and
SnapshotManifest classes.
The new snapshot generator replaces the previous logic for generating snapshots in
QuorumController.java and associated classes. The new generator is intended to be shared between
the broker and the controller, so it is decoupled from both.
There are a few small changes to the old snapshot generator in this PR. Specifically, we move the
batch processing time and batch size metrics out of BrokerMetadataListener.scala and into
BrokerServerMetrics.scala.
Finally, fix a case where we are using 'is' rather than '==' for a numeric comparison in
snapshot_test.py.
Reviewers: David Arthur <mumrah@gmail.com>
Prior to starting a KIP-866 migration, the ZK brokers must register themselves with the active
KRaft controller. The controller waits for all brokers to register in order to verify that all the
brokers can
A) Communicate with the quorum
B) Have the migration config enabled
C) Have the proper IBP set
This patch uses the new isMigratingZkBroker field in BrokerRegistrationRequest and
RegisterBrokerRecord. The type was changed from int8 to bool for BrokerRegistrationRequest (a
mistake from #12860). The ZK brokers use the existing BrokerLifecycleManager class to register and
heartbeat with the controllers.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
Let `RaftClient.createSnapshot` take the snapshotId directly instead of the committed offset/epoch (which may not exist).
Reviewers: José Armando García Sancio <jsancio@apache.org>
Introduce MetadataProvenance to encapsulate the three-tuple of (offset, epoch, timestamp) that is
associated with each MetadataImage, as well as each on-disk snapshot. Also introduce a builder
for MetadataDelta.
Remove offset and epoch tracking from MetadataDelta. We do not really need to know this information
until we are creating the final MetadataImage object. Therefore, this bookkeeping should be done by
the metadata loading code, not inside the delta code, like the other bookkeeping. This simplifies a
lot of tests, as well as simplifying RecordTestUtils. It also makes more sense for snapshots, where
the offset and epoch are the same for every record.
Add ImageReWriter, an ImageWriter that applies records to a MetadataDelta. This is useful when you
need to create a MetadataDelta object that holds the contents of a MetadataImage. This will be
used in the new image loader code (coming soon).
Add ImageWriterOptionsTest to test ImageWriterOptions.
Reviewers: David Arthur <mumrah@gmail.com>
This patch adds support for reading and writing ZooKeeper metadata during a KIP-866 migration.
For reading metadata from ZK, methods from KafkaZkClient and ZkData are reused to ensure we are decoding the JSON consistently.
For writing metadata, we use a new multi-op transaction that ensures only a single controller is writing to ZK. This is similar to the existing multi-op transaction that KafkaController uses, but it also includes a check on the new "/migration" ZNode. The transaction consists of three operations:
* CheckOp on /controller_epoch
* SetDataOp on /migration with zkVersion
* CreateOp/SetDataOp/DeleteOp (the actual operation being applied)
In the case of a batch of operations (such as topic creation), only the final MultiOp has a SetDataOp on /migration while the other requests use a CheckOp (similar to /controller_epoch).
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>