Commit Graph

519 Commits

Author SHA1 Message Date
Colin P. McCabe 7049333617 KAFKA-14943: Fix ClientQuotaControlManager validation
Don't allow setting negative or zero values for quotas. Don't allow SCRAM mechanism names to be
used as client quota names. SCRAM mechanisms are not client quotas. (The confusion arose because of
internal ZK representation details that treated them both as "client configs.")

Add unit tests for ClientQuotaControlManager.isValidIpEntity and
ClientQuotaControlManager.configKeysForEntityType.

This change doesn't affect metadata record application, only input validation. If there are bad
client quotas that are set currently, this change will not alter the current behavior (of throwing
an exception and ignoring the bad quota).
2023-04-27 10:42:32 -07:00
David Arthur c1b5c75d92
KAFKA-14805 KRaft controller supports pre-migration mode (#13407)
This patch adds the concept of pre-migration mode to the KRaft controller. While in this mode, 
the controller will only allow certain write operations. The purpose of this is to disallow metadata 
changes when the controller is waiting for the ZK migration records to be committed.

The following ControllerWriteEvent operations are permitted in pre-migration mode

* completeActivation
* maybeFenceReplicas
* writeNoOpRecord
* processBrokerHeartbeat
* registerBroker (only for migrating ZK brokers)
* unregisterBroker

Raft events and other controller events do not follow the same code path as ControllerWriteEvent, 
so they are not affected by this new behavior.

This patch also add a new metric as defined in KIP-868: kafka.controller:type=KafkaController,name=ZkMigrationState

In order to support upgrades from 3.4.0, this patch also redefines the enum value of value 1 to mean 
MIGRATION rather than PRE_MIGRATION.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2023-04-26 10:20:30 -04:00
Manyanda Chitimbo dd63d88ac3
MINOR: fix noticed typo in raft and metadata projects (#13612)
Reviewers: Josep Prat <jlprat@apache.org>
2023-04-21 15:02:06 +02:00
David Jacot 2d0b816150
MINOR: Move `ControllerPurgatory` to `server-common` (#13555)
This patch renames from `ControllerPurgatory` to `DeferredEventQueue` and moves it from the `metadata` module to `server-common` module.

Reviewers: Alexandre Dupriez <alexandre.dupriez@gmail.com>, Ziming Deng <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@apache.org>
2023-04-21 11:19:04 +02:00
Purshotam Chauhan df13775254
KAFKA-14828: Remove R/W locks using persistent data structures (#13437)
Currently, StandardAuthorizer uses a R/W lock for maintaining the consistency of data. For the clusters with very high traffic, we will typically see an increase in latencies whenever a write operation comes. The intent of this PR is to get rid of the R/W lock with the help of immutable or persistent collections. Basically, new object references are used to hold the intermediate state of the write operation. After the completion of the operation, the main reference to the cache is changed to point to the new object. Also, for the read operation, the code is changed such that all accesses to the cache for a single read operation are done to a particular cache object only.

In the PR description, you can find the performance of various libraries at the time of both read and write. Read performance is checked with the existing AuthorizerBenchmark. For write performance, a new AuthorizerUpdateBenchmark has been added which evaluates the performance of the addAcl operation.


Reviewers:  Ron Dagostino <rndgstn@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>,  Divij Vaidya <diviv@amazon.com>
2023-04-21 14:08:23 +05:30
Proven Provenzano abca86511e
KAFKA-14881: Rework UserScramCredentialRecord (#13513)
Rework UserScramCredentialRecord to store serverKey and StoredKey rather than saltedPassword. This
is necessary to support migration from ZK, since those are the fields we stored in ZK.  Update
latest MetadataVersion to IBP_3_5_IV2 and make SCRAM support conditional on this version.  Moved
ScramCredentialData.java from org.apache.kafka.image to org.apache.kafka.metadata, which seems more
appropriate.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-04-18 09:41:38 -07:00
Manyanda Chitimbo b36a170aa3
MINOR: fix typos in MigrationClient, StandardAuthorizer, StandardAuthorizerData and KafkaConfigSchema files (#13593)
Reviewers: Luke Chen <showuon@gmail.com>
2023-04-18 19:36:56 +08:00
Ron Dagostino e27926f92b
KAFKA-14735: Improve KRaft metadata image change performance at high … (#13280)
topic counts.

Introduces the use of persistent data structures in the KRaft metadata image to avoid copying the entire TopicsImage upon every change.  Performance that was O(<number of topics in the cluster>) is now O(<number of topics changing>), which has dramatic time and GC improvements for the most common topic-related metadata events.  We abstract away the chosen underlying persistent collection library via ImmutableMap<> and ImmutableSet<> interfaces and static factory methods.

Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>, Purshotam Chauhan <pchauhan@confluent.io>
2023-04-17 17:52:28 -04:00
andymg3 c4ad09e47d
MINOR: Add more KRaft reassignment tests (#13521)
Although KAFKA-14808 did not affect KRaft mode, it is important to ensure that we have regression
tests in KRaft mode to prevent a similar bug from appearing there in the future. This PR adds two
tests. First, it adds a test that makes sure we handle what happens when a reassignment completes
and none of the new replicas can be made leader. It's important that we dont keep an old replica as
leader. Second, it adds a test that makes sure we handle new reassignments that don't include a
previous assignment replica that was leader.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-04-12 12:00:35 -07:00
Colin Patrick McCabe f1f35ef1a8
KAFKA-14894: MetadataLoader must call finishSnapshot after loading a snapshot (#13541)
The MetadataLoader must call finishSnapshot after loading a snapshot. This function removes
whatever was in the old snapshot that is not in the new snapshot that was just loaded. While this
is not significant when the old snapshot was the empty snapshot, it is important to do when we are
loading a snapshot on top of an existing non-empty image.

In initializeNewPublishers, the newly installed publishers should be given a MetadataDelta based on
MetadataImage.EMPTY, reflecting the fact that they are seeing everything for the first time.

Reviewers: David Arthur <mumrah@gmail.com>
2023-04-11 15:02:33 -07:00
José Armando García Sancio 672dd3ab6a
KAFKA-13020; Implement reading Snapshot log append timestamp (#13345)
The SnapshotReader exposes the "last contained log time". This is mainly used during snapshot cleanup. The previous implementation used the append time of the snapshot record. This is not accurate as this is the time when the snapshot was created and not the log append time of the last record included in the snapshot.

The log append time of the last record included in the snapshot is store in the header control record of the snapshot. The header control record is the first record of the snapshot.

To be able to read this record, this change extends the RecordsIterator to decode and expose the control records in the Records type.

Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
2023-04-07 09:25:54 -07:00
Calvin Liu 8c88cdb718
KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to reject stale request. (#13408)
Second part of the [KIP-903](https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR), it updates the AlterPartitionRequest:
- Deprecate the NewIsr field
- Create a new field BrokerState with BrokerId and BrokerEpoch
- Bump the AlterPartition version to 3

With this change, the Quorum Controller is enabled to reject stale AlterPartition request.

Reviewers: Jun Rao <junrao@gmail.com>, David Jacot <djacot@confluent.io>
2023-03-31 11:27:42 +02:00
andymg3 887d05559f
MINOR: Create only one FeatureControlManager instance in ReplicationControlManagerTest (#13468)
This is a small patch to make it so we only create one FeatureControlManager instance in ReplicationControlManagerTest. Currently we create two, which isn't needed. Its also a bit confusing because the ReplicationControlTestContext objects ends up having a different FeatureControlManager reference that the one its own ReplicationControlManager instance has a reference to.

Reviewers: José Armando García Sancio <jsancio@apache.org>, dengziming <dengziming1993@gmail.com>
2023-03-29 19:10:03 -07:00
Colin Patrick McCabe 09e59bc776
KAFKA-14857: Fix some MetadataLoader bugs (#13462)
The MetadataLoader is not supposed to publish metadata updates until we have loaded up to the high
water mark. Previously, this logic was broken, and we published updates immediately. This PR fixes
that and adds a junit test.

Another issue is that the MetadataLoader previously assumed that we would periodically get
callbacks from the Raft layer even if nothing had happened. We relied on this to install new
publishers in a timely fashion, for example. However, in older MetadataVersions that don't include
NoOpRecord, this is not a safe assumption.

Aside from the above changes, also fix a deadlock in SnapshotGeneratorTest, fix the log prefix for
BrokerLifecycleManager, and remove metadata publishers on brokerserver shutdown (like we do for
controllers).

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
2023-03-29 12:30:12 -07:00
andymg3 379b6978a0
KAFKA-14829: Consolidate reassignment logic into PartitionReassignmentReplicas (#13440)
Currently, we have various bits of reassignment logic spread across different classes. For example, ReplicationControlManager contains logic for when a reassignment is in progress, which is duplication in PartitionChangeBuilder. Another example is PartitionReassignmentRevert which contains logic for how to undo/revert a reassignment. The idea here is to move the logic to PartitionReassignmentReplicas so it's more testable and easier to reason about.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2023-03-29 10:12:40 -07:00
David Arthur f1b3732fa6
KAFKA-14796 Migrate ACLs from AclAuthorizor to KRaft (#13368)
This patch refactors the loadCache method in AclAuthorizer to make it reusable by ZkMigrationClient.
The loaded ACLs are converted to AccessControlEntryRecord. I noticed we still have the defunct
AccessControlRecord, so I've deleted it.

Also included here are the methods to write ACL changes back to ZK while in dual-write mode.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Colin P. McCabe <cmccabe@apache.org>
2023-03-27 16:12:02 -07:00
Colin Patrick McCabe ed400e4c0d
KAFKA-14835: Create ControllerMetadataMetricsPublisher (#13438)
Separate out KRaft controller metrics into two groups: metrics directly managed by the
QuorumController, and metrics handled by an external publisher. This separation of concerns makes
the code easier to reason about, by clarifying what metrics can be changed where.

The external publisher, ControllerServerMetricsPublisher, handles all metrics which are related to
the content of metadata. For example, metrics about number of topics or number of partitions, etc.
etc. It fits into the MetadataLoader metadata publishing framework as another publisher.  Since
ControllerServerMetricsPublisher operates off of a MetadataImage, we don't have to create
(essentially) another copy of the metadata in memory, as ControllerMetricsManager. This reduces
memory consumption. Another benefit of operating off of the MetadataImage is that we don't have to
have special handling for each record type, like we do now in ControllerMetricsManager.

Reviewers: David Arthur <mumrah@gmail.com>
2023-03-24 11:26:53 -07:00
andymg3 df5850274d
MINOR: Expand use of PartitionAssignment (#13402)
Updates ReplicationControlManager and PartitionReassignmentReplicas to use PartitionAssignment.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2023-03-20 13:44:54 -07:00
Colin Patrick McCabe ddd652c672
MINOR: Standardize KRaft logging, thread names, and terminology (#13390)
Standardize KRaft thread names.

- Always use kebab case. That is, "my-thread-name".

- Thread prefixes are just strings, not Option[String] or Optional<String>.
  If you don't want a prefix, use the empty string.

- Thread prefixes end in a dash (except the empty prefix). Then you can
  calculate thread names as $prefix + "my-thread-name"

- Broker-only components get "broker-$id-" as a thread name prefix. For example, "broker-1-"

- Controller-only components get "controller-$id-" as a thread name prefix. For example, "controller-1-"

- Shared components get "kafka-$id-" as a thread name prefix. For example, "kafka-0-"

- Always pass a prefix to KafkaEventQueue, so that threads have names like
  "broker-0-metadata-loader-event-handler" rather than "event-handler". Prior to this PR, we had
  several threads just named "EventHandler" which was not helpful for debugging.

- QuorumController thread name is "quorum-controller-123-event-handler"

- Don't set a thread prefix for replication threads started by ReplicaManager. They run only on the
  broker, and already include the broker ID.

Standardize KRaft slf4j log prefixes.

- Names should be of the form "[ComponentName id=$id] ". So for a ControllerServer with ID 123, we
  will have "[ControllerServer id=123] "

- For the QuorumController class, use the prefix "[QuorumController id=$id] " rather than
  "[Controller <nodeId] ", to make it clearer that this is a KRaft controller.

- In BrokerLifecycleManager, add isZkBroker=true to the log prefix for the migration case.

Standardize KRaft terminology.

- All synonyms of combined mode (colocated, coresident, etc.) should be replaced by "combined"

- All synonyms of isolated mode (remote, non-colocated, distributed, etc.) should be replaced by
  "isolated".
2023-03-16 15:33:03 -07:00
David Arthur 5dcdf71dec
MINOR: Improved error handling in ZK migration (#13372)
This patch fixes many small issues to improve error handling and logging during the ZK migration. A test was added
to simulate a ZK session expiration to ensure the correctness of the migration driver.

With this change, ZK errors thrown during the migration will not hit the fault handler registered with with
KRaftMigrationDriver, but they will be logged.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-03-16 14:21:18 -07:00
Calvin Liu 79b5f7f1ce
KAFKA-14617: Add ReplicaState to FetchRequest (KIP-903) (#13323)
This patch is the first part of KIP-903. It updates the FetchRequest to include the new tagged ReplicaState field which replaces the now deprecated ReplicaId field. The FetchRequest version is bumped to version 15 and the MetadataVersion to 3.5-IV1.

Reviewers: David Jacot <djacot@confluent.io>
2023-03-16 14:04:34 +01:00
Colin Patrick McCabe aaa976a340
MINOR: Some metadata publishing fixes and refactors (#13337)
This PR refactors MetadataPublisher's interface a bit. There is now an onControllerChange
callback. This is something that some publishers might want. A good example is ZkMigrationClient.
Instead of two different publish functions (one for snapshots, one for log deltas), we now have a single onMetadataUpdate function. Most publishers didn't want to do anything different in those two cases.
The ones that do want to do something different for snapshots can always check the manifest type.
The close function now has a default empty implementation, since most publishers didn't need to do
anything there.

Move the SCRAM logic out of BrokerMetadataPublisher and run it on the controller as well.

On the broker, simply use dynamicClientQuotaPublisher to handle dynamic client quotas changes.
That is what the controller already does, and the code is exactly the same in both cases.

Fix the logging in FutureUtils.waitWithLogging a bit. Previously, when invoked from BrokerServer
or ControllerServer, it did not include the standard "[Controller 123] " style prefix indicating server
name and ID. This was confusing, especially when debugging junit tests.

Reviewers: Ron Dagostino <rdagostino@confluent.io>, David Arthur <mumrah@gmail.com>
2023-03-09 14:52:40 -08:00
andymg3 1394675900
MINOR: Add unclean field of PartitionReassignmentRevert to hashCode Equals and toString (#13370)
Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-03-09 10:26:41 -08:00
Ron Dagostino e3817cac89
KAFKA-14351: Controller Mutation Quota for KRaft (#13116)
Implement KIP-599 controller mutation quotas for the KRaft controller. These quotas apply to create
topics, create partitions, and delete topic operations. They are specified in terms of number of
partitions.

The approach taken here is to reuse the ControllerMutationQuotaManager that is also used in ZK
mode. The quotas are implemented as Sensor objects and Sensor.checkQuotas enforces the quota,
whereas Sensor.record notes that new partitions have been modified. While ControllerApis handles
fetching the Sensor objects, we must make the final callback to check the quotas from within
QuorumController. The reason is because only QuorumController knows the final number of partitions
that must be modified. (As one example, up-to-date information about the number of partitions that
will be deleted when a topic is deleted is really only available in QuorumController.)

For quota enforcement, the logic is already in place. The KRaft controller is expected to set the
throttle time in the response that is embedded in EnvelopeResponse, but it does not actually apply
the throttle because there is no client connection to throttle. Instead, the broker that forwarded
the request is expected to return the throttle value from the controller and to throttle the client
connection. It also applies its own request quota, so the enforced/returned quota is the maximum of
the two.

This PR also installs a DynamicConfigPublisher in ControllerServer. This allows dynamic
configurations to be published on the controller. Previously, they could be set, but they were not
applied. Note that we still don't have a good way to set node-level configurations for isolatied
controllers. However, this will allow us to set cluster configs (aka default node configs) and have
them take effect on the controllers.

In a similar vein, this PR separates out the dynamic client quota publisher logic used on the
broker into DynamicClientQuotaPublisher. We can now install this on both BrokerServer and
ControllerServer. This makes dynamically configuring quotas (such as controller mutation quotas)
possible.

Also add a ducktape test, controller_mutation_quota_test.py.

Reviewers: David Jacot <djacot@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Colin P. McCabe <cmccabe@apache.org>
2023-03-07 11:25:34 -08:00
Christo Lolov 5b295293c0
MINOR: Remove unnecessary toString(); fix comment references (#13212)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>, Lucas Brutschy <lbrutschy@confluent.io>
2023-03-06 18:39:04 +01:00
Proven Provenzano 38c409cf33
KAFKA-14084: SCRAM support in KRaft. (#13114)
This commit adds support to store the SCRAM credentials in a cluster with KRaft quorum servers and
no ZK cluster backing the metadata. This includes creating ScramControlManager in the controller,
and adding support for SCRAM to MetadataImage and MetadataDelta.

Change UserScramCredentialRecord to contain only a single tuple (name, mechanism, salt, pw, iter)
rather than a mapping between name and a list. This will avoid creating an excessively large record
if a single user has many entries. Because record ID 11 (UserScramCredentialRecord) has not been
used before, this is a compatible change. SCRAM will be supported in 3.5-IV0 and later.

This commit does not include KIP-900 SCRAM bootstrapping support, or updating the credential cache
on the controller (as opposed to broker). We will implement these in follow-on commits.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2023-03-03 10:23:34 -08:00
Colin P. McCabe 6b89672b5e MINOR: some ZK migration code cleanups.
Some minor improvements to the JavaDoc for ZkMigrationState.

Rename MigrationState to MigrationDriverState to avoid confusion with ZkMigrationState.

Remove ClusterImage#zkBrokers. This costs O(num_brokers) time to calculate, but is only ever used
when in migration state. It should just be calculated in the migration code. (Additionally, the
function ClusterImage.zkBrokers() returns something other than ClusterImage#zkBrokers, which is
confusing.)

Also remove ClusterDelta#liveZkBrokerIdChanges. This is only used in one place, and it's easy to
calculate it there. In general we should avoid providing expensive accessors unless absolutely
necessary. Expensive code should look expensive: if people want to iterate over all brokers, they
can write a loop to do that rather than hiding it inside an accessor.
2023-02-28 13:59:07 -08:00
Purshotam Chauhan c39123d83d
KAKFA-14733: Added a few missing checks for Kraft Authorizer and updated AclAuthorizerTest to run tests for both zk and kraft (#13282)
Added the following checks - 
* In StandardAuthorizerData.authorize() to fail if `patternType` other than `LITERAL` is passed.
* In AclControlManager.addAcl() to fail if Resource Name is null or empty.

Also, updated `AclAuthorizerTest` includes a lot of tests covering various scenarios that are missing in `StandardAuthorizerTest`. This PR changes the AclAuthorizerTest to run tests for both `zk` and `kraft` modes - 
* Rename AclAuthorizerTest -> AuthorizerTest
* Parameterize relevant tests to run for both modes

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
2023-02-21 19:21:15 +05:30
Christo Lolov ba0c5b0902
MINOR: Simplify JUnit assertions in tests; remove accidental unnecessary code in tests (#13219)
* assertEquals called on array
* Method is identical to its super method
* Simplifiable assertions
* Unused imports

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>
2023-02-16 16:13:31 +01:00
David Arthur cb4d9d1abf
KAFKA-14668 Avoid unnecessary UMR during ZK migration (#13183)
Only send UMR to ZK brokers if the cluster metadata or topic metadata has changed.

Reviewers: Akhilesh C <akhileshchg@users.noreply.github.com>, Colin P. McCabe <cmccabe@apache.org>
2023-02-09 13:24:02 -05:00
Christo Lolov a0a9b6ffea
MINOR: Remove unnecessary code (#13210)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>
2023-02-07 17:37:45 +01:00
Ron Dagostino 6d11261d5d
MINOR: IBP_3_4_IV1 should be IBP_3_5_IV0 because it is not in 3.4 (#13198)
The KIP-405 MetadataVersion changes will be released as part of AK 3.5, but were added as BP_3_4_IV1.
This change fixes them to be IBP_3_5_IV0. There is no incompatibility  because this feature has not yet
been released. Also set didMetadataChange to false because KRaft metadata log records did not change.

Reviewers: Satish Duggana <satishd@apache.org>, Christo Lolov <christo_lolov@yahoo.com>, Colin P. McCabe <cmccabe@apache.org>
2023-02-06 10:37:50 -08:00
David Arthur 89a4735c35
KAFKA-14656: Send UMR first during ZK migration (#13159)
When in migration-from-ZK mode and sending RPCs to ZK-based brokers, the KRaft controller must send
full UpdateMetadataRequests prior to sending full LeaderAndIsrRequests. If the controller sends the
requests in the other order, and the ZK-based broker does not already know about some of the nodes
referenced in the LeaderAndIsrRequest, it will reject the request.

This PR includes an integration test, and a number of other small fixes for dual-write.

Co-authored-by: Akhilesh C <akhileshchg@users.noreply.github.com>
Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-01-30 22:31:45 -08:00
José Armando García Sancio 058d8d530b
KAFKA-14618; Fix off by one error in snapshot id (#13108)
The KRaft client expects the offset of the snapshot id to be an end offset. End offsets are
exclusive. The MetadataProvenance type was createing a snapshot id using the last contained offset
which is inclusive. This change fixes that and renames some of the fields to make this difference
more obvious.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-01-13 10:06:38 -08:00
andymg3 0d9a7022a4
KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created (#13104)
### JIRA
https://issues.apache.org/jira/browse/KAFKA-14612

### Details
Makes sure we emit `ConfigRecord`s for a topic iff it actually gets created. Currently, we might emit `ConfigRecord`s even if the topic creation fails later in the `createTopics` method.

I created a new method `incrementalAlterConfig` in `ConfigurationControlManager` that is similar to `incrementalAlterConfig` but it just handles one config at a time. This is used in `ReplicationControlManager` for each topic. By handling one topic's config at a time, it's easier to isolate each topic's config records. This enables us to make sure we only write config records for topics that get created.

I refactored `incrementalAlterConfigResource` to return an `ApiError`. This made it easier to implement the new method `incrementalAlterConfig` in `ConfigurationControlManager` because it then doesnt have to search in the `Map` for the result.

### Testing
Enhanced pre-existing test `ReplicationControlManagerTest.testCreateTopicsWithConfigs`. I ran the tests without the changes to `ReplicationControlManager` and made sure each assertion ends up failing. Also ran `./gradlew metadata:test --tests org.apache.kafka.controller.ReplicationControlManagerTest`.

Reviewers: Jason Gustafson <jason@confluent.io>
2023-01-12 16:23:57 -08:00
Colin Patrick McCabe 8478bbb589
KAFKA-14601: Improve exception handling in KafkaEventQueue #13089
If KafkaEventQueue gets an InterruptedException while waiting for a condition variable, it
currently exits immediately. Instead, it should complete the remaining events exceptionally and
then execute the cleanup event. This will allow us to finish any necessary cleanup steps.

In order to do this, we require the cleanup event to be provided when the queue is contructed,
rather than when it's being shut down.

Also, handle cases where Event#handleException itself throws an exception.

Remove timed shutdown from the event queue code since nobody was using it, and it adds complexity.

Add server-common/src/test/resources/test/log4j.properties since this gradle module somehow avoided
having a test log4j.properties up to this point.

Reviewers: David Arthur <mumrah@gmail.com>
2023-01-12 10:03:14 -08:00
David Arthur 0bb05d8679
KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (#13103)
With the new broker epoch validation logic introduced in #12998, we no longer need the ZK broker epoch to be sent to the KRaft controller. This patch removes that epoch and replaces it with a boolean.

Another small fix is included in this patch for controlled shutdown in migration mode. Previously, if a ZK broker was in migration mode, it would always try to do controlled shutdown via BrokerLifecycleManager. Since there is no ordering dependency between bringing up ZK brokers and the KRaft quorum during migration, a ZK broker could be running in migration mode, but talking to a ZK controller. A small check was added to see if the current controller is ZK or KRaft before decided which controlled shutdown to attempt.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-01-11 14:36:56 -05:00
andymg3 43f531d87a
MINOR: Implement toString method for TopicAssignment and PartitionAssignment (#13101)
Implements `toString` method for classes `TopicAssignment` and` PartitionAssignment`. Also removes the `final` keyword from the constructor arguments for consistency.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2023-01-10 10:00:59 -08:00
Akhilesh C db49070760
KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller. (#12998)
This patch introduces a preliminary state machine that can be used by KRaft
controller to drive online migration from Zk to KRaft.

MigrationState -- Defines the states we can have while migration from Zk to
KRaft.

KRaftMigrationDriver -- Defines the state transitions, and events to handle
actions like controller change, metadata change, broker change and have
interfaces through which it claims Zk controllership, performs zk writes and
sends RPCs to ZkBrokers.

MigrationClient -- Interface that defines the functions used to claim and
relinquish Zk controllership, read to and write from Zk.

Co-authored-by: David Arthur <mumrah@gmail.com>
Reviewers: Colin P. McCabe <cmccabe@apache.org>
2023-01-09 10:44:11 -08:00
Luke Chen 6b5e9e989b
MINOR: add error reason when controller failed to handle events (#13050)
In KRaft, when controller failed to handle events, we'll log error and return back to brokers. But in some cases, we only log error class name, and return error class name back to brokers, which is un-useful for troubleshooting. Ex: When broker registration failed with unsupported version error, it showed:

2022-12-28T17:46:42.876+0800 [DEBUG] [TestEventLogger]     [2022-12-28 17:46:42,877] INFO [Controller 3000] registerBroker: failed with UnsupportedVersionException in 2888 us (org.apache.kafka.controller.QuorumController:447)

2022-12-28T17:46:42.877+0800 [DEBUG] [TestEventLogger]     [2022-12-28 17:46:42,878] INFO [BrokerLifecycleManager id=0] Unable to register broker 0 because the controller returned error UNSUPPORTED_VERSION (kafka.server.BrokerLifecycleManager:66)

Checking the logs, we still don't know which version it supports.
After this PR, it will show:

2022-12-28T17:54:59.671+0800 [DEBUG] [TestEventLogger]     [2022-12-28 17:54:59,671] INFO [Controller 3000] registerBroker: failed with UnsupportedVersionException in 291 us. Reason: Unable to register because the broker does not support version 8 of metadata.version. It wants a version between 4 and 4, inclusive. (org.apache.kafka.controller.QuorumController:447)

2022-12-28T17:54:59.671+0800 [DEBUG] [TestEventLogger]     [2022-12-28 17:54:59,672] INFO [BrokerLifecycleManager id=0] Unable to register broker 0 because the controller returned error UNSUPPORTED_VERSION (kafka.server.BrokerLifecycleManager:66)

Reviewers: dengziming <dengziming1993@gmail.com>, Federico Valeri <fvaleri@redhat.com >
2023-01-07 09:27:59 +08:00
Akhilesh Chaganti 0e51a2026c KAFKA-14458: Introduce RPC support during ZK migration #13028
Add infrastructure for sending UpdateMetadataRequest and LeaderAndIsr RPCs during the migration
process from ZK to KRaft. The new classes use ControllerChannelManager to send the RPCs.  The
information to send comes from MetadataDelta and MetadataImage.

Reviewers: David Arthur <mumrah@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2023-01-04 16:54:58 -08:00
Ismael Juma 96d9710c17
KAFKA-14478: Move LogConfig/CleanerConfig and related to storage module (#13049)
Additional notable changes to fix multiple dependency ordering issues:

* Moved `ConfigSynonym` to `server-common`
* Moved synonyms from `LogConfig` to `ServerTopicConfigSynonyms `
* Removed `LogConfigDef` `define` overrides and rely on
   `ServerTopicConfigSynonyms` instead.
* Moved `LogConfig.extractLogConfigMap` to `KafkaConfig`
* Consolidated relevant defaults from `KafkaConfig`/`LogConfig` in the latter
* Consolidate relevant config name definitions in `TopicConfig`
* Move `ThrottledReplicaListValidator` to `storage`

Reviewers: Satish Duggana <satishd@apache.org>, Mickael Maison <mickael.maison@gmail.com>
2023-01-04 02:42:52 -08:00
José Armando García Sancio 44b3177a08
KAFKA-14457; Controller metrics should only expose committed data (#12994)
The controller metrics in the controllers has three problems. 1) the active controller exposes uncommitted data in the metrics. 2) the active controller doesn't update the metrics when the uncommitted data gets aborted. 3) the controller doesn't update the metrics when the entire state gets reset.

We fix these issues by only updating the metrics when processing committed metadata records and reset the metrics when the metadata state is reset.

This change adds a new type `ControllerMetricsManager` which processes committed metadata records and updates the metrics accordingly. This change also removes metrics updating responsibilities from the rest of the controller managers. 

Reviewers: Ron Dagostino <rdagostino@confluent.io>
2022-12-20 10:55:14 -08:00
Satish Duggana 7146ac57ba
[KAFKA-13369] Follower fetch protocol changes for tiered storage. (#11390)
This PR implements the follower fetch protocol as mentioned in KIP-405.

Added a new version for ListOffsets protocol to receive local log start offset on the leader replica. This is used by follower replicas to find the local log star offset on the leader.

Added a new version for FetchRequest protocol to receive OffsetMovedToTieredStorageException error. This is part of the enhanced fetch protocol as described in KIP-405.

We introduced a new field locaLogStartOffset to maintain the log start offset in the local logs. Existing logStartOffset will continue to be the log start offset of the effective log that includes the segments in remote storage.

When a follower receives OffsetMovedToTieredStorage, then it tries to build the required state from the leader and remote storage so that it can be ready to move to fetch state.

Introduced RemoteLogManager which is responsible for

initializing RemoteStorageManager and RemoteLogMetadataManager instances.
receives any leader and follower replica events and partition stop events and act on them
also provides APIs to fetch indexes, metadata about remote log segments.
Followup PRs will add more functionality like copying segments to tiered storage, retention checks to clean local and remote log segments. This will change the local log start offset and make sure the follower fetch protocol works fine for several cases.

You can look at the detailed protocol changes in KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-FollowerReplication

Co-authors: satishd@apache.org, kamal.chandraprakash@gmail.com, yingz@uber.com

Reviewers: Kowshik Prakasam <kprakasam@confluent.io>, Cong Ding <cong@ccding.com>, Tirtha Chatterjee <tirtha.p.chatterjee@gmail.com>, Yaodong Yang <yangyaodong88@gmail.com>, Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Jun Rao <junrao@gmail.com>
2022-12-17 09:36:44 -08:00
Colin Patrick McCabe 29c09e2ca1
MINOR: ControllerServer should use the new metadata loader and snapshot generator (#12983)
This PR introduces the new metadata loader and snapshot generator. For the time being, they are
only used by the controller, but a PR for the broker will come soon.

The new metadata loader supports adding and removing publishers dynamically. (In contrast, the old
loader only supported adding a single publisher.) It also passes along more information about each
new image that is published. This information can be found in the LogDeltaManifest and
SnapshotManifest classes.

The new snapshot generator replaces the previous logic for generating snapshots in
QuorumController.java and associated classes. The new generator is intended to be shared between
the broker and the controller, so it is decoupled from both.

There are a few small changes to the old snapshot generator in this PR. Specifically, we move the
batch processing time and batch size metrics out of BrokerMetadataListener.scala and into
BrokerServerMetrics.scala.

Finally, fix a case where we are using 'is' rather than '==' for a numeric comparison in
snapshot_test.py.

Reviewers: David Arthur <mumrah@gmail.com>
2022-12-15 16:53:07 -08:00
David Arthur 67c72596af
KAFKA-14448 Let ZK brokers register with KRaft controller (#12965)
Prior to starting a KIP-866 migration, the ZK brokers must register themselves with the active
KRaft controller. The controller waits for all brokers to register in order to verify that all the
brokers can

A) Communicate with the quorum
B) Have the migration config enabled
C) Have the proper IBP set

This patch uses the new isMigratingZkBroker field in BrokerRegistrationRequest and
RegisterBrokerRecord. The type was changed from int8 to bool for BrokerRegistrationRequest (a
mistake from #12860). The ZK brokers use the existing BrokerLifecycleManager class to register and
heartbeat with the controllers.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2022-12-13 13:15:21 -08:00
Jason Gustafson 26a4d42072
MINOR: Pass snapshot ID directly in `RaftClient.createSnapshot` (#12981)
Let `RaftClient.createSnapshot` take the snapshotId directly instead of the committed offset/epoch (which may not exist). 

Reviewers: José Armando García Sancio <jsancio@apache.org>
2022-12-13 10:44:56 -08:00
Colin Patrick McCabe b2dea17041
MINOR: Introduce MetadataProvenance and ImageReWriter (#12964)
Introduce MetadataProvenance to encapsulate the three-tuple of (offset, epoch, timestamp) that is
associated with each MetadataImage, as well as each on-disk snapshot. Also introduce a builder
for MetadataDelta.

Remove offset and epoch tracking from MetadataDelta. We do not really need to know this information
until we are creating the final MetadataImage object. Therefore, this bookkeeping should be done by
the metadata loading code, not inside the delta code, like the other bookkeeping. This simplifies a
lot of tests, as well as simplifying RecordTestUtils.  It also makes more sense for snapshots, where
the offset and epoch are the same for every record.

Add ImageReWriter, an ImageWriter that applies records to a MetadataDelta. This is useful when you
need to create a MetadataDelta object that holds the contents of a MetadataImage. This will be
used in the new image loader code (coming soon).

Add ImageWriterOptionsTest to test ImageWriterOptions.

Reviewers: David Arthur <mumrah@gmail.com>
2022-12-12 09:52:06 -08:00
Purshotam Chauhan c6590ee28b KAFKA-14435: Fix `allow.everyone.if.no.acl.found` config behavior for StandardAuthorizer
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Colin Patrick McCabe <cmccabe@apache.org>
2022-12-09 00:58:13 +05:30
David Arthur d40561e90a
KAFKA-14427 ZK client support for migrations (#12946)
This patch adds support for reading and writing ZooKeeper metadata during a KIP-866 migration.

For reading metadata from ZK, methods from KafkaZkClient and ZkData are reused to ensure we are decoding the JSON consistently.

For writing metadata, we use a new multi-op transaction that ensures only a single controller is writing to ZK. This is similar to the existing multi-op transaction that KafkaController uses, but it also includes a check on the new "/migration" ZNode. The transaction consists of three operations:

* CheckOp on /controller_epoch
* SetDataOp on /migration with zkVersion
* CreateOp/SetDataOp/DeleteOp (the actual operation being applied)

In the case of a batch of operations (such as topic creation), only the final MultiOp has a SetDataOp on /migration while the other requests use a CheckOp (similar to /controller_epoch).

Reviewers: Colin Patrick McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>
2022-12-08 13:14:01 -05:00
andymg3 e96e42ead4
KAFKA-14386; Return TopicAssignment from the ReplicaPlacer (#12892)
This changes the ReplicaPlacer interface to return a class instead of a list of list of integers. There are two reasons for the suggestion. First, as mentioned in the JIRA, it will make the interface, arguably, a bit more readable and understandable by explicitly modeling the idea of topic and partition. Second and more importantly, it makes the interface more extendable in the future. Right now it would be challenging to add more metadata to the response.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2022-12-06 13:01:36 -08:00
David Arthur 7b7e40a536
KAFKA-14304 Add RPC changes, records, and config from KIP-866 (#12928)
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
2022-12-02 19:59:52 -05:00
Colin Patrick McCabe 5514f372b3
MINOR: extract jointly owned parts of BrokerServer and ControllerServer (#12837)
Extract jointly owned parts of BrokerServer and ControllerServer into SharedServer. Shut down
SharedServer when the last component using it shuts down. But make sure to stop the raft manager
before closing the ControllerServer's sockets.

This PR also fixes a memory leak where ReplicaManager was not removing some topic metric callbacks
during shutdown. Finally, we now release memory from the BatchMemoryPool in KafkaRaftClient#close.
These changes should reduce memory consumption while running junit tests.

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
2022-12-02 00:27:22 -08:00
José Armando García Sancio 72b535acaf
KAFKA-14307; Controller time-based snapshots (#12761)
Implement time based snapshot for the controller. The general strategy for this feature is that the controller will use the record-batch's append time to determine if a snapshot should be generated. If the oldest record that has been committed but is not included in the latest snapshot is older than `metadata.log.max.snapshot.interval.ms`, the controller will trigger a snapshot immediately. This is useful in case the controller was offline for more that `metadata.log.max.snapshot.interval.ms` milliseconds.

If the oldest record that has been committed but is not included in the latest snapshot is NOT older than `metadata.log.max.snapshot.interval.ms`, the controller will schedule a `maybeGenerateSnapshot` deferred task.

It is possible that when the controller wants to generate a new snapshot, either because of time or number of bytes, the controller is currently generating a snapshot. In this case the `SnapshotGeneratorManager` was changed so that it checks and potentially triggers another snapshot when the currently in-progress snapshot finishes.

To better support this feature the following additional changes were made:
1. The configuration `metadata.log.max.snapshot.interval.ms` was added to `KafkaConfig` with a default value of one hour.
2. `RaftClient` was extended to return the latest snapshot id. This snapshot id is used to determine if a given record is included in a snapshot.
3. Improve the `SnapshotReason` type to support the inclusion of values in the message.

Reviewers: Jason Gustafson <jason@confluent.io>, Niket Goel <niket-goel@users.noreply.github.com>
2022-11-21 17:30:50 -08:00
Colin Patrick McCabe ac3a3687a0
MINOR: Avoid highestSupportedVersion outside tests (#12829)
We should avoid using Message.highestSupportedVersion to generate metadata records. Instead, we
need to pick the correct record version based on the current metadata version which is in effect.
In cases where there is only one record version that we know how to generate, we can hard code
that version, but it should just be a simple constant zero.

Reviewers: Jason Gustafson <jason@confluent.io>
2022-11-08 08:38:47 -08:00
David Jacot bc780c7c32
MINOR: Move timeline data structures from metadata to server-common (#12811)
This path moves the timeline data structures from metadata module to server-common module as those will be used in the new group coordinator.

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Colin Patrick McCabe <cmccabe@apache.org>
2022-11-04 08:52:32 +01:00
Jason Gustafson cbe50d95a9
MINOR: Add test case for topic recreation with collision chars (#12796)
This patch adds a unit test for topic recreation with colliding characters (such as `.`). This was broken up until https://github.com/apache/kafka/pull/12790. 

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
2022-10-28 18:50:08 -07:00
Luke Chen c1bb307a36
KAFKA-14337; Correctly remove topicsWithCollisionChars after topic deletion (#12790)
In https://github.com/apache/kafka/pull/11910 , we added a feature to prevent topics with conflicting metrics names from being created. We added a map to store the normalized topic name to the topic names, but we didn't remove it correctly while deleting topics. This PR fixes this bug and add a test.

Reviewers: Igor Soarez <i@soarez.me>, dengziming <dengziming1993@gmail.com>, Jason Gustafson <jason@confluent.io>
2022-10-28 10:08:53 -07:00
Jason Gustafson b525ddc9f1
MINOR: Fix PartitionRegistration.hashCode (#12774)
`PartitionRegistration.hashCode` passes raw arrays to `Objects.hash` in the `hashCode` implementation. This doesn't work since the `equals` implementation uses `Arrays.equals`. We should use `Arrays.hashCode` instead. 

Reviewers: David Arthur <mumrah@gmail.com>
2022-10-20 14:01:01 -07:00
José Armando García Sancio d0ff869718
MINOR; Add accessor methods to OffsetAndEpoch (#12770)
Accessor are preferred over fields because they compose better with Java's
lambda syntax.

Reviewers: Jason Gustafson <jason@confluent.io>
2022-10-19 12:07:07 -07:00
Jason Gustafson a692223a44
KAFKA-14316; Fix feature control iterator metadata version handling (#12765)
The iterator `FeatureControlIterator.hasNext()` checks two conditions: 1) whether we have already written the metadata version, and 2) whether the underlying iterator has additional records. However, in `next()`, we also check that the metadata version is at least high enough to include it in the log. When this fails, then we can see an unexpected `NoSuchElementException` if the underlying iterator is empty.

Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
2022-10-18 15:30:45 -07:00
José Armando García Sancio 7dc17908de
KAFKA-14300; Generate snapshot after repeated controller resign (#12747)
Setting the `committedBytesSinceLastSnapshot` to 0 when resigning can cause the controller to not generate a snapshot after `snapshotMaxNewRecordBytes` committed bytes have been replayed.

This change fixes that by simply not resetting the counter during resignation. This is correct because the counter tracks the number of committed bytes replayed but not included in the latest snapshot. In other words, reverting the last committed state does not invalidate this value.

Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
2022-10-18 15:09:20 -07:00
Alyssa Huang 0cb1d61413
KAFKA-14292; Fix KRaft controlled shutdown delay (#12736)
The `controlledShutDownOffset` is defined as the "offset at which the broker should complete its controlled shutdown, or -1 if the broker is not performing a controlled shutdown". The controller sets this offset to a non-negative integer on receiving a heartbeat from a broker that's in controlled shutdown state. Currently, this offset is being updated and bumped every single time a broker in controlled shutdown mode send a heartbeat, delaying when controlled shutdown can actually complete for the broker. We should only update the offset when it was previously set to -1 to allow controlled shutdown to complete.

Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
2022-10-13 13:29:45 -07:00
Colin Patrick McCabe dac81161db
MINOR; Introduce ImageWriter and ImageWriterOptions (#12715)
This PR adds a new ImageWriter interface which replaces the generic Consumer interface which
accepted lists of records. It is better to do batching in the ImageWriter than to try to deal with
that complexity in the MetadataImage#write functions, especially since batching is not semantically
meaningful in KRaft snapshots. The new ImageWriter interface also supports freeze and close, which
more closely matches the semantics of the underlying Raft classes.

The PR also adds an ImageWriterOptions class which we can use to pass parameters to control how the
new image is written. Right now, the parameters that we are interested in are the target metadata
version (which may be more or less than the original image's version) and a handler function which
is invoked whenever metadata is lost due to the target version.

Convert over the MetadataImage#write function (and associated functions) to use the new ImageWriter
and ImageWriterOptions. In particular, we now have a way to handle metadata losses by invoking
ImageWriterOptions#handleLoss. This allows us to handle writing an image at a lower version, for
the first time. This support is still not enabled externally by this PR, though. That will come in
a future PR.

Get rid of the use of SOME_RECORD_TYPE.highestSupportedVersion() in several places. In general, we
do not want to "silently" change the version of a record that we output, just because a new version
was added. We should be explicit about what record version numbers we are outputting.

Implement ProducerIdsDelta#toString, to make debug logs look better.

Move MockRandom to the server-common package so that other internal broker packages can use it.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2022-10-13 09:56:19 -07:00
Niket 98a3dcb477
KAFKA-14275; KRaft Controllers should crash after failing to apply any metadata record (#12709)
Make all faults in metadata processing on standby controllers be fatal. This is the same behavior-wise as the active controller. This prevents a standby controller from eventually becoming active with incomplete state.

Reviewers: Colin Patrick McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
2022-10-11 09:46:42 -07:00
Colin Patrick McCabe 1c07095cbd
MINOR: fix indentation and add builders in some KRaft tests (#12720)
Add builders for LocalLogManagerTestEnv and QuorumControllerTestEnv, since the constructor
overloads were starting to get unwieldy.

Make indentation more consistent in QuorumControllerTest. Take advantage of the fact that you can
initialize multiple resources in a Java try-with-resources block to avoid excessive indentation in a few
cases.

Reviewers: José Armando García Sancio <jsancio@apache.org>
2022-10-07 13:53:41 -07:00
Luke Chen 42b311ed44
MINIOR: some typos in javadoc (#12723)
Reviewer: David Jacot <djacot@confluent.io>
2022-10-07 19:59:18 +08:00
Jason Gustafson c5745d2845
MINOR: Add initial property tests for StandardAuthorizer (#12703)
In https://github.com/apache/kafka/pull/12695, we discovered a gap in our testing of `StandardAuthorizer`. We addressed the specific case that was failing, but I think we need to establish a better methodology for testing which incorporates randomized inputs. This patch is a start in that direction. We implement a few basic property tests using jqwik which focus on prefix searching. It catches the case from https://github.com/apache/kafka/pull/12695 prior to the fix. In the future, we can extend this to cover additional operation types, principal matching, etc.

Reviewers: David Arthur <mumrah@gmail.com>
2022-10-04 16:31:43 -07:00
Colin P. McCabe fc786c335c add unit and integration tests 2022-09-28 22:58:39 -07:00
Colin P. McCabe ba89eaf0d7 KAFKA-14265: Prefix ACLs may shadow other prefix ACLs 2022-09-28 17:03:41 -07:00
Colin Patrick McCabe 7496e62434
KAFKA-14259: BrokerRegistration#toString throws an exception, terminating metadata replay (#12681)
Previously, BrokerRegistration#toString sould throw an exception, terminating metadata replay,
because the sorted() method is used on an entry set rather than a key set.

Reviewers: David Arthur <mumrah@gmail.com>
2022-09-23 15:39:50 -07:00
Akhilesh C 6c6b8e2f96
KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads. (#12628)
Fixes an issue with StandardAuthorizer#authorize that allowed inconsistent results. The underlying 
concurrent data structure (ConcurrentSkipListMap) had weak consistency guarantees. This meant
that a concurrent update to the authorizer data could result in the authorize function processing 
ACL updates out of order.

This patch replaces the concurrent data structures with regular non-thread safe equivalents and uses
a read/write lock for thread safety and strong consistency.

Reviewers: David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>, Colin P. McCabe <cmccabe@apache.org>, Luke Chen <showuon@gmail.com>
2022-09-20 16:54:18 -04:00
Colin Patrick McCabe ae4bb0c6fa
KAFKA-14243: Temporarily disable unsafe downgrade (#12664)
Reviewers: David Arthur <mumrah@gmail.com>
2022-09-20 15:32:52 -04:00
Ashmeet Lamba 86645cb40a
KAFKA-14073; Log the reason for snapshot (#12414)
When a snapshot is taken it is due to either of the following reasons -

    Max bytes were applied
    Metadata version was changed

Once the snapshot process is started, it will log the reason that initiated the process.

Updated existing tests to include code changes required to log the reason. I was not able to check the logs when running tests - could someone guide me on how to enable logs when running a specific test case.

Reviewers: dengziming <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@apache.org>
2022-09-13 10:03:47 -07:00
Colin Patrick McCabe 81b71c06f3
KAFKA-14204: QuorumController must correctly handle overly large batches (#12595)
Originally, the QuorumController did not try to limit the number of records in a batch that it sent
to the Raft layer.  This caused two problems. Firstly, we were not correctly handling the exception
that was thrown by the Raft layer when a batch of records was too large to apply atomically. This
happened because the Raft layer threw an exception which was a subclass of ApiException. Secondly,
by letting the Raft layer split non-atomic batches, we were not able to create snapshots at each of
the splits. This led to O(N) behavior during controller failovers.

This PR fixes both of these issues by limiting the number of records in a batch. Atomic batches
that are too large will fail with a RuntimeException which will cause the active controller to
become inactive and revert to the last committed state. Non-atomic batches will be split into
multiple batches with a fixed number of records in each.

Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@gmail.com>
2022-09-08 14:21:29 -07:00
Ron Dagostino ef65b6e566
KAFKA-14195: Fix KRaft AlterConfig policy usage for Legacy/Full case (#12578)
#12374 adjusted the invocation of the alter configs policy check in KRaft to match the behavior in ZooKeeper, which is to only provide the configs that were explicitly sent in the request. While the code was correct for the incremental alter configs case, the code actually included the implicit deletions for the legacy/non-incremental alter configs case, and those implicit deletions are not included in the ZooKeeper-based invocation. This patch adds a test to check for this and adjusts ConfigurationControlManager code so that the test passes -- the adjusted test is confirmed to fail locally otherwise. We also add a log statement to emit any unexpected stack traces in the alter config code path.

Reviewers: José Armando García Sancio <jsancio@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2022-09-01 15:59:17 -07:00
José Armando García Sancio f83c6f2da4
KAFKA-14183; Cluster metadata bootstrap file should use header/footer (#12565)
The boostrap.checkpoint files should include a control record batch for
the SnapshotHeaderRecord at the start of the file. It should also
include a control record batch for the SnapshotFooterRecord at the end
of the file.

The snapshot header record is important because it versions the rest of
the bootstrap file.

Reviewers: David Arthur <mumrah@gmail.com>
2022-08-27 19:11:06 -07:00
Colin Patrick McCabe f0f918b242
KAFKA-14177: Correctly support older kraft versions without FeatureLevelRecord (#12513)
The main changes here are ensuring that we always have a metadata.version record in the log, making
˘sure that the bootstrap file can be used for records other than the metadata.version record (for
example, we will want to put SCRAM initialization records there), and fixing some bugs.

If no feature level record is in the log and the IBP is less than 3.3IV0, then we assume the minimum KRaft
version for all records in the log.

Fix some issues related to initializing new clusters. If there are no records in the log at all,
then insert the bootstrap records in a single batch. If there are records, but no metadata version,
process the existing records as though they were metadata.version 3.3IV0 and then append a metadata
version record setting version 3.3IV0.  Previously, we were not clearly distinguishing between the
case where the metadata log was empty, and the case where we just needed to add a metadata.version
record.

Refactor BootstrapMetadata into an immutable class which contains a 3-tuple of metadata version,
record list, and source. The source field is used to log where the bootstrap metadata was obtained
from. This could be a bootstrap file, the static configuration, or just the software defaults.
Move the logic for reading and writing bootstrap files into BootstrapDirectory.java.

Add LogReplayTracker, which tracks whether the log is empty.

Fix a bug in FeatureControlManager where it was possible to use a "downgrade" operation to
transition to a newer version. Do not store whether we have seen a metadata version or not in
FeatureControlManager, since that is now handled by LogReplayTracker.

Introduce BatchFileReader, which is a simple way of reading a file containing batches of snapshots
that does not require spawning a thread. Rename SnapshotFileWriter to BatchFileWriter to be
consistent, and to reflect the fact that bootstrap files aren't snapshots.

QuorumController#processBrokerHeartbeat: add an explanatory comment.

Reviewers: David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
2022-08-25 18:12:31 -07:00
dengziming 19581effbf
KAFKA-13850: Show missing record type in MetadataShell (#12103)
AccessControlEntryRecord and RemoveAccessControlEntryRecord are added in KIP-801, FeatureLevelRecord was added in KIP-778, and BrokerRegistrationChangeRecord was added in KIP-841, and NoOpRecord was added in KIP-835, I added these 5 record types in MetadataShell.

 Reviewers: Luke Chen <showuon@gmail.com>
2022-08-25 14:09:01 +08:00
David Arthur 5eff8592cc
KAFKA-14178 Don't record queue time for deferred events (#12551) 2022-08-24 10:01:48 -04:00
Divij Vaidya 9aef992118
MINOR: Catch InvocationTargetException explicitly and propagate underlying cause (#12230)
Catch InvocationTargetException explicitly and propagate underlying cause

Reviewers: Ismael Juma <mlists@juma.me.uk>, Matthew de Detrich <mdedetrich@gmail.com>, Kvicii, Luke Chen <showuon@gmail.com>
2022-08-23 17:34:39 +08:00
Akhilesh C 5f4af5f7d1
KAFKA-14170: Fix NPE in the deleteTopics() code path of KRaft Controller (#12533)
Fix a bug in ReplicationControlManager where we got a NullPointerException when removing a topic
with no offline replicas, and there were other topics that did have offline replicas.

Fix an issue in MetadataDelta#replay where we were replaying RemoveTopicRecord twice.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>
2022-08-18 17:14:17 -07:00
Jason Gustafson 5990471b8c
KAFKA-14154; Kraft controller should return NOT_CONTROLLER if request epoch is ahead (#12514)
Similar to https://github.com/apache/kafka/pull/12506. For the Kraft controller, we should return NOT_CONTROLLER if the leader/partition epoch in the request is ahead of the controller. 

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
2022-08-15 11:34:29 -07:00
dengziming 50e5b32a6d
KAFKA-13959: Controller should unfence Broker with busy metadata log (#12274)
The reason for KAFKA-13959 is a little complex, the two keys to this problem are:

KafkaRaftClient.MAX_FETCH_WAIT_MS==MetadataMaxIdleIntervalMs == 500ms. We rely on fetchPurgatory to complete a FetchRequest, in details, if FetchRequest.fetchOffset >= log.endOffset, we will wait for 500ms to send a FetchResponse. The follower needs to send one more FetchRequest to get the HW.

Here are the event sequences:

1. When starting the leader(active controller) LEO=m+1(m is the offset of the last record), leader HW=m(because we need more than half of the voters to reach m+1)
2. Follower (standby controller) and observer (broker) send FetchRequest(fetchOffset=m)
    2.1. leader receives FetchRequest, set leader HW=m and waits 500ms before send FetchResponse
    2.2. leader send FetchResponse(HW=m)
    3.3 broker receive FetchResponse(HW=m), set metadataOffset=m.
3. Leader append NoOpRecord, LEO=m+2. leader HW=m
4. Looping 1-4

If we change MAX_FETCH_WAIT_MS=200 (less than half of MetadataMaxIdleIntervalMs), this problem can be solved temporarily.

We plan to improve this problem in 2 ways, firstly, in this PR, we change the controller to unfence a broker when the broker's high-watermark has reached the broker registration record for that broker. Secondly, we will propagate the HWM to the replicas as quickly as possible in KAFKA-14145.

Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
2022-08-12 09:06:24 -07:00
Niket Goel ac64693434 KAFKA-14114: Add Metadata Error Related Metrics
This PR adds in 3 metrics as described in KIP-859:
 kafka.server:type=broker-metadata-metrics,name=metadata-load-error-count
 kafka.server:type=broker-metadata-metrics,name=metadata-apply-error-count
 kafka.controller:type=KafkaController,name=MetadataErrorCount

These metrics are incremented by fault handlers when the appropriate fault happens. Broker-side
load errors happen in BrokerMetadataListener. Broker-side apply errors happen in the
BrokerMetadataPublisher. The metric on the controller is incremented when the standby controller
(not active) encounters a metadata error.

In BrokerMetadataPublisher, try to limit the damage caused by an exception by introducing more
catch blocks. The only fatal failures here are those that happen during initialization, when we
initialize the manager objects (these would also be fatal in ZK mode).

In BrokerMetadataListener, try to improve the logging of faults, especially ones that happen when
replaying a snapshot. Try to limit the damage caused by an exception.

Replace MetadataFaultHandler with LoggingFaultHandler, which is more flexible and takes a Runnable
argument. Add LoggingFaultHandlerTest.

Make QuorumControllerMetricsTest stricter. Fix a bug where we weren't cleaning up some metrics from
the yammer registry on close in QuorumControllerMetrics.

Co-author: Colin P. McCabe <cmccabe@apache.org>
2022-08-09 15:22:15 -07:00
Colin Patrick McCabe e67711af71
MINOR: BrokerMetadataSnapshotter must avoid exceeding batch size (#12486)
BrokerMetadataSnapshotter should split up record lists that exceed the batch size.

Reviewers: David Arthur <mumrah@gmail.com>
2022-08-09 13:24:24 -07:00
Niket 48caba9340
KAFKA-14104; Add CRC validation when iterating over Metadata Log Records (#12457)
This commit adds a check to ensure the RecordBatch CRC is valid when
iterating over a Batch of Records using the RecordsIterator. The
RecordsIterator is used by both Snapshot reads and Log Records reads in
Kraft. The check can be turned off by a class parameter and is on by default.

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
2022-08-08 15:03:04 -07:00
Colin Patrick McCabe 555744da70
KAFKA-14124: improve quorum controller fault handling (#12447)
Before trying to commit a batch of records to the __cluster_metadata log, the active controller
should try to apply them to its current in-memory state. If this application process fails, the
active controller process should exit, allowing another node to take leadership. This will prevent
most bad metadata records from ending up in the log and help to surface errors during testing.

Similarly, if the active controller attempts to renounce leadership, and the renunciation process
itself fails, the process should exit. This will help avoid bugs where the active controller
continues in an undefined state.

In contrast, standby controllers that experience metadata application errors should continue on, in
order to avoid a scenario where a bad record brings down the whole controller cluster.  The
intended effect of these changes is to make it harder to commit a bad record to the metadata log,
but to continue to ride out the bad record as well as possible if such a record does get committed.

This PR introduces the FaultHandler interface to implement these concepts. In junit tests, we use a
FaultHandler implementation which does not exit the process. This allows us to avoid terminating
the gradle test runner, which would be very disruptive. It also allows us to ensure that the test
surfaces these exceptions, which we previously were not doing (the mock fault handler stores the
exception).

In addition to the above, this PR fixes a bug where RaftClient#resign was not being called from the
renounce() function. This bug could have resulted in the raft layer not being informed of an active
controller resigning.

Reviewers: David Arthur <mumrah@gmail.com>
2022-08-04 22:49:45 -07:00
David Arthur add7cd85ba
KAFKA-14136 Generate ConfigRecord for brokers even if the value is unchanged (#12483) 2022-08-04 15:09:08 -04:00
Colin Patrick McCabe 0c4da23098
KAFKA-14129: KRaft must check manual assignments for createTopics are contiguous (#12467)
KRaft should validate that manual assignments given to createTopics are contiguous. In other words,
they must start with partition 0, and progress through 1, 2, 3, etc. ZK mode does this, but KRaft
mode previously did not. Also fix a null pointer exception when the placement for partition 0
was not specified.

Convert over AddPartitionsTest to use KRaft. This PR converts all of the test except for some of
the placement logic tests, which will need to be redone for KRaft mode in a future change.

Fix null pointer exception in KRaftMetadataCache#getPartitionInfo.  Specifically, we should not
assume that the partition will be found in the hash map. This is another case where we had
"Some(x)" but it should be "Option(x)."

Fix a potential null pointer exception in BrokerServer#state.

Reviewers: dengziming <dengziming1993@gmail.com>, Jason Gustafson <jason@confluent.io>
2022-08-02 15:39:47 -07:00
dengziming f26842ca0b
MINOR; Use right enum value for broker registration change (#12236)
The code used BrokerRegistrationFencingChange.FENCE when unfencing a broker and used BrokerRegistrationFencingChange.UNFENCE when fencing a broker, this is confusing. This commit flips the values of the two enums and changes their usage at all of the call sites.

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
2022-08-02 05:38:52 -07:00
David Arthur c020c94e04
KAFKA-14039 Fix AlterConfigPolicy usage in KRaft (#12374)
Only pass configs from the request to the AlterConfigPolicy. This changes the KRaft usage of the AlterConfigPolicy to match the usage in ZK mode.

Reviewers: Jason Gustafson <jason@confluent.io>
2022-07-15 15:48:35 -04:00
Divij Vaidya 5e4c8f704c
KAFKA-13943; Make `LocalLogManager` implementation consistent with the `RaftClient` contract (#12224)
Fixes two issues in the implementation of `LocalLogManager`:

- As per the interface contract for `RaftClient.scheduleAtomicAppend()`, it should throw a `NotLeaderException` exception when the provided current leader epoch does not match the current epoch. However, the current `LocalLogManager`'s implementation of the API returns a LONG_MAX instead of throwing an exception. This change fixes the behaviour and makes it consistent with the interface contract.
-  As per the interface contract for `RaftClient.resign(epoch)`if the parameter epoch does not match the current epoch, this call will be ignored. But in the current `LocalLogManager` implementation the leader epoch might change when the thread is waiting to acquire a lock on `shared.tryAppend()` (note that tryAppend() is a synchronized method). In such a case, if a NotALeaderException is thrown (as per code change in above), then resign should be ignored.

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Tom Bentley <tbentley@redhat.com>, Jason Gustafson <jason@confluent.io>
2022-07-05 20:08:28 -07:00
Niket c19398ee66
KAFKA-14035; Fix NPE in `SnapshottableHashTable::mergeFrom()` (#12371)
The NPE causes the kraft controller to be in an inconsistent state. 

Reviewers: Jason Gustafson <jason@confluent.io>
2022-06-30 21:03:54 -07:00
David Arthur c6c9da02a8
KAFKA-13966 Prepend bootstrap metadata to controller queue (#12269)
Also fixes flaky QuorumControllerTest#testInvalidBootstrapMetadata

Reviewers: Jason Gustafson <jason@confluent.io>
2022-06-23 11:29:21 -04:00
Jason Gustafson ead6645123
MINOR: Add ineligible replica reason to log message (#12328)
It's useful if the message about ineligible replicas explains the reason the replica is ineligible.

Reviewers: David Jacot <djacot@confluent.io>
2022-06-22 13:27:47 -07:00
Divij Vaidya 17637c4ad5
MINOR: Clean up tmp files created by tests (#12233)
There are a bunch of tests which do not clean up after themselves. This leads to
accumulation of files in the tmp directory of the system on which the tests are
running. 

This code change fixes some of the main culprit tests which leak the files in the
temporary directory.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Kvicii <kvicii.yu@gmail.com>
2022-06-16 16:46:07 -07:00
David Jacot f83d95d9a2
KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 2) (#12181)
This path implements [KIP-841](https://cwiki.apache.org/confluence/display/KAFKA/KIP-841%3A+Fenced+replicas+should+not+be+allowed+to+join+the+ISR+in+KRaft). Specifically, it implements the following:
* It introduces INELIGIBLE_REPLICA and NEW_LEADER_ELECTED error codes.
* The KRaft controller validates the new ISR provided in the AlterPartition request and rejects the call if any replica in the new ISR is not eligible to join the the ISR - e.g. when fenced or shutting down. The leader reverts to the last committed ISR when its request is rejected due to this.
* The partition leader also verifies that a replica is eligible before trying to add it back to the ISR. If it is not eligible, the ISR expansion is not triggered at all.
* Updates the AlterPartition API to use topic ids. Updates the AlterPartition manger to handle topic names/ids. Updates the ZK controller and the KRaft controller to handle topic names/ids depending on the version of the request used.

Reviewers: Artem Livshits <84364232+artemlivshits@users.noreply.github.com>, José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
2022-06-14 13:12:45 +02:00
David Arthur cc384054c6
KAFKA-13935 Fix static usages of IBP in KRaft mode (#12250)
* Set the minimum supported MetadataVersion to 3.0-IV1
* Remove MetadataVersion.UNINITIALIZED
* Relocate RPC version mapping for fetch protocols into MetadataVersion
* Replace static IBP calls with dynamic calls to MetadataCache

A side effect of removing the UNINITIALIZED metadata version is that the FeatureControlManager and FeatureImage will initialize themselves with the minimum KRaft version (3.0-IV1).

The rationale for setting the minimum version to 3.0-IV1 is so that we can avoid any cases of KRaft mode running with an old log message format (KIP-724 was introduced in 3.0-IV1). As a side-effect of increasing this minimum version, the feature level values decreased by one.

Reviewers: Jason Gustafson <jason@confluent.io>, Jun Rao <junrao@gmail.com>
2022-06-13 14:23:28 -04:00
Christo Lolov 6c90f3335e
KAFKA-13947: Use %d formatting for integers rather than %s (#12267)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>, Kvicii <kvicii.yu@gmail.com>
2022-06-10 13:55:52 +02:00
José Armando García Sancio 21490af989
MINOR; Test last committed record offset for Controllers (#12249)
As part of KIP-835, LastCommittedRecordOffset was added to the
KafkaController metric type. Make sure to test that metric.

Reviewers: Jason Gustafson <jason@confluent.io>
2022-06-08 10:45:04 -07:00
David Jacot 151ca12a56
KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (#12240)
This PR implements the first part of KIP-841. Specifically, it implements the following:

1. Adds a new metadata version.
2. Adds the InControlledShutdown field to the BrokerRegistrationRecord and BrokerRegistrationChangeRecord and bump their versions. The newest versions are only used if the new metadata version is enabled.
3. Writes a BrokerRegistrationChangeRecord with InControlledShutdown set when a broker requests a controlled shutdown.
4. Ensures that fenced and in controlled shutdown replicas are not picked as leaders nor included in the ISR.
5. Adds or extends unit tests.

Reviewes: José Armando García Sancio <jsancio@users.noreply.github.com>, dengziming <dengziming1993@gmail.com>, David Arthur <mumrah@gmail.com>
2022-06-07 10:37:20 -07:00
Colin Patrick McCabe 65b4374203
MINOR: implement BrokerRegistrationChangeRecord (#12195)
Implement BrokerRegistrationChangeRecord as specified in KIP-746. This is a more flexible record than the
single-purpose Fence / Unfence records.

Reviewers: José Armando García Sancio <jsancio@gmail.com>, dengziming <dengziming1993@gmail.com>
2022-06-01 16:33:01 -07:00
Colin Patrick McCabe 0ca9cd4d2d
MINOR: Several fixes and improvements for FeatureControlManager (#12207)
This PR fixes a bug where FeatureControlManager#replay(FeatureLevelRecord) was throwing an
exception if not all controllers in the quorum supported the feature being applied. While we do
want to validate this, it needs to be validated earlier, before the record is committed to the log.
Once the record has been committed to the log it should always be applied if the current controller
supports it.

Fix another bug where removing a feature was not supported once it had been configured. Note that
because we reserve feature level 0 for "feature not enabled", we don't need to use
Optional<VersionRange>; we can just return a range of 0-0 when the feature is not supported.

Allow the metadata version to be downgraded when UpgradeType.UNSAFE_DOWNGRADE has been set.
Previously we were unconditionally denying this even when this was set.

Add a builder for FeatureControlManager, so that we can easily add new parameters to the
constructor in the future. This will also be useful for creating FeatureControlManagers that are
initialized to a specific MetadataVersion.

Get rid of RemoveFeatureLevelRecord, since it's easier to just issue a FeatureLevelRecord with
the level set to 0.

Set metadata.max.idle.interval.ms to 0 in RaftClusterSnapshotTest for more predictability.

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
2022-06-01 16:09:38 -07:00
José Armando García Sancio 7d1b0926fa
KAFKA-13883: Implement NoOpRecord and metadata metrics (#12183)
Implement NoOpRecord as described in KIP-835. This is controlled by the new
metadata.max.idle.interval.ms configuration.

The KRaft controller schedules an event to write NoOpRecord to the metadata log if the metadata
version supports this feature. This event is scheduled at the interval defined in
metadata.max.idle.interval.ms. Brokers and controllers were improved to ignore the NoOpRecord when
replaying the metadata log.

This PR also addsffour new metrics to the KafkaController metric group, as described KIP-835.

Finally, there are some small fixes to leader recovery. This PR fixes a bug where metadata version
3.3-IV1 was not marked as changing the metadata. It also changes the ReplicaControlManager to
accept a metadata version supplier to determine if the leader recovery state is supported.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2022-06-01 10:48:24 -07:00
David Arthur 4efdc1a310
MINOR: Consolidate FinalizedFeatureCache into MetadataCache (#12214)
Reviewers: Colin P. McCabe <cmccabe@apache.org>
2022-05-26 16:25:58 -04:00
andymg3 4878653016
MINOR: Use parameterized logging in StandardAuthorizer and StandardAuthorizerData (#12192)
This updates StandardAuthorizer and StandardAuthorizerData to use parameterized logging per the SLF4J recommendation (see https://www.slf4j.org/faq.html). This also removes a couple if statements that explicitly check if trace is enabled, but the logger should handle not publishing the message and not constructing the String if trace is not enabled.

Reviewers: Jason Gustafson <jason@confluent.io>
2022-05-21 18:14:02 -07:00
andymg3 a8e3a259e5
KAFKA-13889: Fix AclsDelta handling of REMOVE_ACCESS_CONTROL_ENTRY_RECORD (#12160)
AclsDelta stores the pending deletion in the changes Map. This could override a creation that might have just happened. This is an issue because in BrokerMetadataPublisher this results in us making a removeAcl call which finally results in StandardAuthorizerData trying to remove an ACL that doesn't exist which throws an exception. If the ACCESS_CONTROL_ENTRY_RECORD event never got processed by BrokerMetadataPublisher then the ACL wont be in the Map yet.

The fix here is to remove the entry from the changes Map if the ACL doesnt exist in the image yet.

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
2022-05-19 16:24:34 -07:00
dengziming 6380652a5a
KAFKA-13863; Prevent null config value when create topic in KRaft mode (#12109)
This patch ensures consistent handling of null-valued topic configs between the zk and kraft controller. Prior to this patch, we returned INVALID_REQUEST in zk mode and it was not an error in kraft. After this patch, we return INVALID_CONFIG consistently for this case.

Reviewers: Jason Gustafson <jason@confluent.io>
2022-05-19 09:46:48 -07:00
David Arthur 1135f22eaf
KAFKA-13830 MetadataVersion integration for KRaft controller (#12050)
This patch builds on #12072 and adds controller support for metadata.version. The kafka-storage tool now allows a
user to specify a specific metadata.version to bootstrap into the cluster, otherwise the latest version is used.

Upon the first leader election of the KRaft quroum, this initial metadata.version is written into the metadata log. When
writing snapshots, a FeatureLevelRecord for metadata.version will be written out ahead of other records so we can
decode things at the correct version level.

This also includes additional validation in the controller when setting feature levels. It will now check that a given
metadata.version is supportable by the quroum, not just the brokers.

Reviewers: José Armando García Sancio <jsancio@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>, Alyssa Huang <ahuang@confluent.io>
2022-05-18 12:08:36 -07:00
David Arthur 62ba4d3d4e
MINOR: Remove extraneous code in LocalLogManager (#12168)
Reviewers: Kvicii <Karonazaba@gmail.com>, dengziming <dengziming1993@gmail.com>, Divij Vaidya <divijvaidya13@gmail.com>
2022-05-18 10:58:08 -04:00
Colin Patrick McCabe fa59be4e77
KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading (#11969)
Since the StandardAuthorizer relies on the metadata log to store its ACLs, we need to be sure that
we have the latest metadata before allowing the authorizer to be used. However, if the authorizer
is not usable for controllers in the cluster, the latest metadata cannot be fetched, because
inter-node communication cannot occur. In the initial commit which introduced StandardAuthorizer,
we punted on the loading issue by allowing the authorizer to be used immediately. This commit fixes
that by implementing early.start.listeners as specified in KIP-801. This will allow in superusers
immediately, but throw the new AuthorizerNotReadyException if non-superusers try to use the
authorizer before StandardAuthorizer#completeInitialLoad is called.

For the broker, we call StandardAuthorizer#completeInitialLoad immediately after metadata catch-up
is complete, right before unfencing. For the controller, we call
StandardAuthorizer#completeInitialLoad when the node has caught up to the high water mark of the
cluster metadata partition.

This PR refactors the SocketServer so that it creates the configured acceptors and processors in
its constructor, rather than requiring a call to SocketServer#startup A new function,
SocketServer#enableRequestProcessing, then starts the threads and begins listening on the
configured ports. enableRequestProcessing uses an async model: we will start the acceptor and
processors associated with an endpoint as soon as that endpoint's authorizer future is completed.

Also fix a bug where the controller and listener were sharing an Authorizer when in co-located
mode, which was not intended.

Reviewers: Jason Gustafson <jason@confluent.io>
2022-05-12 14:48:33 -07:00
andymg3 040b11d705
KAFKA-13892: Fix bug where multiple remove records are generated for one ACL
Fix a bug where multiple remove records could be generated for a single ACL. Previously, this happened
if the user submitted multiple filters to deleteAcls, and more than one matched.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
2022-05-10 15:26:57 -07:00
dengziming 0c1cde1080
KAFKA-13862; Support Append/Subtract multiple config values in KRaft mode (#12108)
We can append/subtract multiple config values in kraft mode using the `IncrementalAlterConfig` RPC. For example: append/subtract topic config "cleanup.policy" with value="delete,compact" will end up treating "delete,compact" as a value not 2 values. This patch fixes the problem. Additionally, it update the zk logic to correctly handle duplicate additions.

Reviewers: Akhilesh Chaganti <akhileshchg@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
2022-05-10 12:41:17 -07:00
Akhilesh Chaganti 430f75ba22
KAFKA-13861; Fix the validateOnly behavior for CreatePartitions requests in KRaft mode (#12106)
The KRaft implementation of the `CreatePartitions` ignores the `validateOnly` flag in the
request and creates the partitions if the validations are successful. Fixed the behavior
not to create partitions upon validation if the `validateOnly` flag is true.

Reviewers: Divij Vaidya <divijvaidya13@gmail.com>, dengziming <dengziming1993@gmail.com>, Jason Gustafson <jason@confluent.io>
2022-05-04 10:31:46 -07:00
David Jacot 4a367577b9
MINOR: Fix minor typos in `PartitionChangeBuilder` (#12101)
Reviewers: Luke Chen <showuon@gmail.com>
2022-04-28 11:19:34 +08:00
David Jacot 7c8c65fc54
MINOR: Rename `ZkVersion` to `PartitionEpoch` (#12071)
This patch does some initial cleanups in the context of KAFKA-13790. Mainly, it renames `ZkVersion` field to `PartitionEpoch` in the `LeaderAndIsrRequest`, the `LeaderAndIsr` and the `Partition`.

Reviewers: Jason Gustafson <jason@confluent.io>, dengziming <dengziming1993@gmail.com>
2022-04-22 20:38:17 +02:00
Colin Patrick McCabe d480c4aa6e
KAFKA-13841: Fix a case where we were unable to place on fenced brokers in KRaft mode (#12075)
This PR fixes a case where we were unable to place on fenced brokers In KRaft mode. Specifically,
if we had a broker registration in the metadata log, but no associated heartbeat, previously the
HeartbeatManager would not track the fenced broker. This PR fixes this by adding this logic to the
metadata log replay path in ClusterControlManager.

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
2022-04-21 14:58:02 -07:00
José Armando García Sancio 4380eae7ce
MINOR; Fix partition change record noop check (#12073)
When LeaderRecoveryState was added to the PartitionChangeRecord, the
check for being a noop was not updated. This commit fixes that and
improves the associated test to avoid this oversight in the future.

Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
2022-04-21 09:05:46 -07:00
Colin Patrick McCabe 1521813a3a
KAFKA-13807: Fix incrementalAlterConfig and refactor some things (#12033)
Ensure that we can set log.flush.interval.ms at the broker or cluster level via
IncrementalAlterConfigs. This was broken by KAFKA-13749, which added log.flush.interval.ms as the
second synonym rather than the first. Add a regression test to DynamicConfigChangeTest.

Create ControllerRequestContext and pass it to every controller API. This gives us a uniform way to
pass through information like the deadline (if there is one) and the Kafka principal which is
making the request (in the future we will want to log this information).

In ControllerApis, enforce a timeout for broker heartbeat requests which is equal to the heartbeat
request interval, to avoid heartbeats piling up on the controller queue. This should have been done
previously, but we overlooked it.

Add a builder for ClusterControlManager and ReplicationControlManager to avoid the need to deal
with a lot of churn (especially in test code) whenever a new constructor parameter gets added for
one of these.

In ControllerConfigurationValidator, create a separate function for when we just want to validate
that a ConfigResource is a valid target for DescribeConfigs. Previously we had been re-using the
validation code for IncrementalAlterConfigs, but this was messy.

Split out the replica placement code into a separate package and reorganize it a bit.

Reviewers: David Arthur <mumrah@gmail.com
2022-04-15 16:07:23 -07:00
David Arthur 55ff5d3603
KAFKA-13823 Feature flag changes from KIP-778 (#12036)
This PR includes the changes to feature flags that were outlined in KIP-778.  Specifically, it
changes UpdateFeatures and FeatureLevelRecord to remove the maximum version level. It also adds
dry-run to the RPC so the controller can actually attempt the upgrade (rather than the client). It
introduces an upgrade type enum, which supersedes the allowDowngrade boolean. Because
FeatureLevelRecord was unused previously, we do not need to introduce a new version.

The kafka-features.sh tool was overhauled in KIP-778 and now includes the describe, upgrade,
downgrade, and disable sub-commands.  Refer to
[KIP-778](https://cwiki.apache.org/confluence/display/KAFKA/KIP-778%3A+KRaft+Upgrades) for more
details on the new command structure.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>
2022-04-14 10:04:32 -07:00
dengziming 87aa8259dd KAFKA-13743: Prevent topics with conflicting metrics names from being created in KRaft mode #11910
In ZK mode, the topic "foo_bar" will conflict with "foo.bar" because of limitations in metric
names. We should implement this in KRaft mode.  This PR also changes TopicCommandIntegrationTest to
support KRaft mode.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2022-04-13 11:59:29 -07:00
Jason Gustafson f97646488c
KAFKA-13651; Add audit logging to `StandardAuthorizer` (#12031)
This patch adds audit support through the kafka.authorizer.logger logger to StandardAuthorizer. It
follows the same conventions as AclAuthorizer with a similarly formatted log message. When
logIfAllowed is set in the Action, then the log message is at DEBUG level; otherwise, we log at
trace. When logIfDenied is set, then the log message is at INFO level; otherwise, we again log at
TRACE.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2022-04-13 10:33:15 -07:00
Colin Patrick McCabe 62ea4c46a9
KAFKA-13749: CreateTopics in KRaft must return configs (#11941)
Previously, when in KRaft mode, CreateTopics did not return the active configurations for the
topic(s) it had just created. This PR addresses that gap. We will now return these topic
configuration(s) when the user has DESCRIBE_CONFIGS permission. (In the case where the user does
not have this permission, we will omit the configurations and set TopicErrorCode. We will also omit
the number of partitions and replication factor data as well.)

For historical reasons, we use different names to refer to each topic configuration when it is set
in the broker context, as opposed to the topic context. For example, the topic configuration
"segment.ms" corresponds to the broker configuration "log.roll.ms". Additionally, some broker
configurations have synonyms. For example, the broker configuration "log.roll.hours" can be used to
set the log roll time instead of "log.roll.ms". In order to track all of this, this PR adds a
table in LogConfig.scala which maps each topic configuration to an ordered list of ConfigSynonym
classes. (This table is then passed to KafkaConfigSchema as a constructor argument.)

Some synonyms require transformations. For example, in order to convert from "log.roll.hours" to
"segment.ms", we must convert hours to milliseconds. (Note that our assumption right now is that
topic configurations do not have synonyms, only broker configurations. If this changes, we will
need to add some logic to handle it.)

This PR makes the 8-argument constructor for ConfigEntry public. We need this in order to make full
use of ConfigEntry outside of the admin namespace. This change is probably inevitable in general
since otherwise we cannot easily test the output from various admin APIs in junit tests outside the
admin package.

Testing:

This PR adds PlaintextAdminIntegrationTest#testCreateTopicsReturnsConfigs. This test validates
some of the configurations that it gets back from the call to CreateTopics, rather than just checking
if it got back a non-empty map like some of the existing tests. In order to test the
configuration override logic, testCreateDeleteTopics now sets up some custom static and dynamic
configurations.

In QuorumTestHarness, we now allow tests to configure what the ID of the controller should be. This
allows us to set dynamic configurations for the controller in testCreateDeleteTopics. We will have
a more complete fix for setting dynamic configuations on the controller later.

This PR changes ConfigurationControlManager so that it is created via a Builder. This will make it
easier to add more parameters to its constructor without having to update every piece of test code
that uses it. It will also make the test code easier to read.

Reviewers: David Arthur <mumrah@gmail.com>
2022-04-01 10:50:25 -07:00
Jason Gustafson b2cb6caa1e
MINOR: Move `KafkaYammerMetrics` to server-common (#11970)
With major server components like the new quorum controller being moved outside of the `core` module, it is useful to have shared dependencies moved into `server-common`. An example of this is Yammer metrics which server components still rely heavily upon. All server components should have access to the default registry used by the broker so that new metrics can be registered and metric naming conventions should be standardized. This is particularly important in KRaft where we are attempting to recreate identically named metrics in the controller context. This patch takes a step in this direction. It moves `KafkaYammerMetrics` into `server-common` and it implements
standard metric naming utilities there. 

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
2022-03-30 13:59:22 -07:00
dengziming d449f850e1
MINOR: show LogRecoveryState in MetadataShell and fix log message
Show the LeaderRecoveryState in MetadataShell.

Fix a case where we were comparing a Byte type with an enum type.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2022-03-21 14:33:51 -07:00
José Armando García Sancio 8d6968e832
KAFKA-13682; KRaft Controller auto preferred leader election (#11893)
Implement auto leader rebalance for KRaft by keeping track of the set of topic partitions which have a leader that is not the preferred replica. If this set is non-empty then schedule a leader balance event for the replica control manager.

When applying PartitionRecords and PartitionChangeRecords to the ReplicationControlManager, if the elected leader is not the preferred replica then remember this topic partition in the set of imbalancedPartitions.

Anytime the quorum controller processes a ControllerWriteEvent it schedules a rebalance operation if the there are no pending rebalance operations, the feature is enabled and there are imbalance partitions.

This KRaft implementation only supports the configurations properties auto.leader.rebalance.enable and leader.imbalance.check.interval.seconds. The configuration property leader.imbalance.per.broker.percentage is not supported and ignored.

Reviewers: Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>
2022-03-18 14:30:52 -07:00
José Armando García Sancio 52621613fd
KAFKA-13587; Implement leader recovery for KIP-704 (#11733)
Implementation of the protocol for starting and stopping leader recovery after an unclean leader election. This includes the management of state in the controllers (legacy and KRaft) and propagating this information to the brokers. This change doesn't implement log recovery after an unclean leader election.

Protocol Changes
================

For the topic partition state znode, the new field "leader_recovery_state" was added. If the field is missing the value is assumed to be RECOVERED.

ALTER_PARTITION was renamed from ALTER_ISR. The CurrentIsrVersion field was renamed to PartitionEpoch. The new field LeaderRecoveryState was added.

The new field LeaderRecoverState was added to the LEADER_AND_ISR request. The inter broker protocol version is used to determine which version to send to the brokers.

A new tagged field for LeaderRecoveryState was added to both the PartitionRecord and PartitionChangeRecord.

Controller
==========

For both the KRaft and legacy controller the LeaderRecoveryState is set to RECOVERING, if the leader was elected out of the ISR, also known as unclean leader election. The controller sets the state back to RECOVERED after receiving an ALTER_PARTITION request with version 0, or with version 1 and with the LeaderRecoveryState set to RECOVERED.

Both controllers preserve the leader recovery state even if the unclean leader goes offline and comes back online before an RECOVERED ALTER_PARTITION is sent.

The controllers reply with INVALID_REQUEST if the ALTER_PARTITION either:

    1. Attempts to increase the ISR while the partition is still RECOVERING
    2. Attempts to change the leader recovery state to RECOVERING from a RECOVERED state.

Topic Partition Leader
======================

The topic partition leader doesn't implement any log recovery in this change. The topic partition leader immediately marks the partition as RECOVERED and sends that state in the next ALTER_PARTITION request.

Reviewers: Jason Gustafson <jason@confluent.io>
2022-03-18 09:24:11 -07:00
Colin Patrick McCabe bda5c34b03
MINOR: refactor how ConfigurationControl checks for resource existence (#11835)
ConfigurationControl methods should take a boolean indicating whether the resource is newly
created, rather than taking an existence checker object. The boolean is easier to understand. Also
add a unit test of existing checking failing (and succeeding).

Reviewers: Kirk True <kirk@mustardgrain.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
2022-03-15 12:50:53 -07:00
Colin Patrick McCabe 07553d13f7
MINOR: create KafkaConfigSchema and TimelineObject (#11809)
Create KafkaConfigSchema to encapsulate the concept of determining the types of configuration keys.
This is useful in the controller because we can't import KafkaConfig, which is part of core. Also
introduce the TimelineObject class, which is a more generic version of TimelineInteger /
TimelineLong.

Reviewers: David Arthur <mumrah@gmail.com>
2022-03-02 14:26:31 -08:00
Jason Gustafson 5f91aa7b4c
KAFKA-13698; KRaft authorizer should use host address instead of name (#11807)
Use `InetAddress.getHostAddress` in `StandardAuthorizer` instead of `InetAddress.getHostName`.

Reviewers: Colin Patrick McCabe <cmccabe@confluent.io>
2022-02-26 10:52:34 -08:00
Jason Gustafson 2c90447a59
KAFKA-13697; KRaft authorizer should support AclOperation.ALL (#11806)
KRaft authorizer should support AclOperation.ALL correctly.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2022-02-25 15:43:21 -08:00
Jason Gustafson fc20c551d6
MINOR: Clearer field names for ProducerIdsRecord and related classes (#11747)
The current naming of the fields in `ProducerIdsRecord` is a little confusing in regard to whether the block range was inclusive or exclusive. This patch tries to improve naming to make this clearer. In the record class, instead of `ProducerIdsEnd`, we use `NextProducerId`. We have also updated related classes such as `ProducerIdsBlock.java` with similar changes.

Reviewers: dengziming <dengziming1993@gmail.com>, David Arthur <mumrah@gmail.com>
2022-02-11 16:14:31 -08:00
Colin Patrick McCabe d35283f011
KAFKA-13646; Implement KIP-801: KRaft authorizer (#11649)
Currently, when using KRaft mode, users still have to have an Apache ZooKeeper instance if they want to use AclAuthorizer. We should have a built-in Authorizer for KRaft mode that does not depend on ZooKeeper. This PR introduces such an authorizer, called StandardAuthorizer. See KIP-801 for a full description of the new Authorizer design.

Authorizer.java: add aclCount API as described in KIP-801. StandardAuthorizer is currently the only authorizer that implements it, but eventually we may implement it for AclAuthorizer and others as well.

ControllerApis.scala: fix a bug where createPartitions was authorized using CREATE on the topic resource rather than ALTER on the topic resource as it should have been.

QuorumTestHarness: rename the controller endpoint to CONTROLLER for consistency (the brokers already called it that). This is relevant in AuthorizerIntegrationTest where we are examining endpoint names. Also add the controllerServers call.

TestUtils.scala: adapt the ACL functions to be usable from KRaft, by ensuring that they use the Authorizer from the current active controller.

BrokerMetadataPublisher.scala: add broker-side ACL application logic.

Controller.java: add ACL APIs. Also add a findAllTopicIds API in order to make junit tests that use KafkaServerTestHarness#getTopicNames and KafkaServerTestHarness#getTopicIds work smoothly.

AuthorizerIntegrationTest.scala: convert over testAuthorizationWithTopicExisting (more to come soon)

QuorumController.java: add logic for replaying ACL-based records. This means storing them in the new AclControlManager object, and integrating them into controller snapshots. It also means applying the changes in the Authorizer, if one is configured. In renounce, when reverting to a snapshot, also set newBytesSinceLastSnapshot to 0.

Reviewers: YeonCheol Jang <YeonCheolGit@users.noreply.github.com>,  Jason Gustafson <jason@confluent.io>
2022-02-09 10:38:52 -08:00
Colin Patrick McCabe 68a19539cf
KAFKA-13552: Fix BROKER and BROKER_LOGGER in KRaft (#11657)
Currently, KRaft does not support setting BROKER_LOGGER configs (it always fails.) Additionally,
there are several bugs in the handling of BROKER configs. They are not properly validated on the
forwarding broker, and the way we apply them is buggy as well. This PR fixes those issues.

KafkaApis: add support for doing validation and log4j processing on the forwarding broker. This
involves breaking the config request apart and forwarding only part of it. Adjust KafkaApisTest to
test the new behavior, rather than expecting forwarding of the full request.

MetadataSupport: remove MetadataSupport#controllerId since it duplicates the functionality of
MetadataCache#controllerId. Add support for getResourceConfig and maybeForward.

ControllerApis: log an error message if the handler throws an exception, just like we do in
KafkaApis.

ControllerConfigurationValidator: add JavaDoc.

Move some functions that don't involve ZK from ZkAdminManager to DynamicConfigManager. Move some
validation out of ZkAdminManager and into a new class, ConfigAdminManager, which is not tied to ZK.

ForwardingManager: add support for sending new requests, rather than just forwarding existing
requests.

BrokerMetadataPublisher: do not try to apply dynamic configurations for brokers other than the
current one. Log an INFO message when applying a new dynamic config, like we do in ZK mode. Also,
invoke reloadUpdatedFilesWithoutConfigChange when applying a new non-default BROKER config.

QuorumController: fix a bug in ConfigResourceExistenceChecker which prevented cluster configs from
being set. Add a test for this class.

Reviews: José Armando García Sancio <jsancio@users.noreply.github.com>
2022-01-21 17:00:21 -07:00
Colin Patrick McCabe 486cbd4793
MINOR: RegisterBroker should use an atomic append (#11700)
The batch of records that registers a new broker should be committed atomically. This doesn't matter
right now, because we only create a single record to register the broker. But if we create multiple
records in the future, this could matter. In order to avoid confusion, this should use
ControllerResult#atomicOf.

Reviewers: David Arthur <mumrah@gmail.com>
2022-01-21 10:09:11 -08:00
José Armando García Sancio 1aa26faf88
MINOR: Fix EventQueueProcessingTimeMs metric #11668
Make sure that the event queue processing time histogram gets updated
and add tests that verify that the update methods modify the correct
histogram.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2022-01-11 21:48:39 -08:00
Colin Patrick McCabe 8e301b48e9
KAFKA-13528: KRaft RegisterBroker should validate that the cluster ID matches (#11593)
The KRaft controller should validate that the clusterID matches before allowing a broker to register in
the cluster.

Reviewers: José Armando García Sancio <jsancio@gmail.com>
2022-01-06 10:28:36 -08:00
Colin P. Mccabe 2cd96f0e64 MINOR: some code cleanups in the controller 2021-12-10 15:29:50 -08:00
Colin Patrick McCabe 454d63c158
KAFKA-13515: Fix KRaft config validation issues (#11577)
Require that topics exist before topic configurations can be created for them.

Merge the code from ConfigurationControlManager#checkConfigResource into
ControllerConfigurationValidator to avoid duplication.

Add KRaft support to DynamicConfigChangeTest.

Split out tests in DynamicConfigChangeTest that don't require a cluster into
DynamicConfigChangeUnitTest to save test time.

Reviewers: David Arthur <mumrah@gmail.com>
2021-12-10 12:29:58 -08:00
Colin P. Mccabe a957748d2e KAFKA-13490: Fix createTopics and incrementalAlterConfigs for KRaft mode #11416
For CreateTopics, fix a bug where if one createTopics in a batch failed, they would all fail with
the same error code.  Make the error message for TOPIC_ALREADY_EXISTS consistent with the ZK-based
code by including the topic name.

For IncrementalAlterConfigs, before we allow topic configurations to be set, we should check that
they are valid. (This also applies to newly created topics.) IncrementalAlterConfigs should ignore
non-null payloads for DELETE operations. Previously we would return an error in these cases.
However, this is not compatible with the old ZK-based code, which ignores the payload in these
cases.

Reviewers: José Armando García Sancio <jsancio@gmail.com>, Jason Gustafson <jason@confluent.io>
2021-12-05 16:32:56 -08:00
loboya~ 42306ba267
KAFKA-12932: Interfaces for SnapshotReader and SnapshotWriter (#11529)
Change the snapshot API so that SnapshotWriter and SnapshotReader are interfaces. Change the existing types SnapshotWriter and SnapshotReader to use a different name and to implement the interfaces introduced by this commit.

Co-authored-by: loboxu <loboxu@tencent.com>
Reviews: José Armando García Sancio <jsancio@users.noreply.github.com>
2021-11-30 11:44:39 -07:00
Colin Patrick McCabe e8b53caab4 KAFKA-13357; Store producer IDs in broker snapshots
When creating snapshots, controllers generate a ProducerIdsRecord indicating the highest producer ID
that has been used so far. Brokers should generate the same record, so that the snapshots can be
compared.

Also, fix a bug in MetadataDelta#finishSnapshot. The current logic will produce the wrong result if
all objects of a certain type are completely removed in the snapshot. The fix is to unconditionally
create each delta object.

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
2021-11-24 11:06:09 -07:00
Jorge Esteban Quilcate Otoya 214b59b3ec
KAFKA-13429: ignore bin on new modules (#11415)
Reviewers: John Roesler <vvcephei@apache.org>
2021-11-10 14:36:24 -06:00
Colin Patrick McCabe af8100b94f
KAFKA-13340: Change ZooKeeperTestHarness to QuorumTestHarness (#11417)
Change ZooKeeperTestHarness to QuorumTestHarness so that integration tests which inherit from
this class can test Kafka in both ZK and KRaft mode. Test cases which do this can specify the
modes they support by including a ParameterizedTest annotation before each test case, like the
following:

@ParameterizedTest
@valuesource(strings = Array("zk", "kraft"))
def testValidCreateTopicsRequests(quorum: String): Unit = { ... }

For each value that is specified here (zk, kraft), the test case will be run once in the
appropriate mode. So the test shown above is run twice. This allows integration tests to be
incrementally converted over to support KRaft mode, rather than rewritten to support it. For
now, test cases which do not specify a quorum argument will continue to run only in ZK mode.

JUnit5 makes the quorum annotation visible in the TestInfo object which each @BeforEeach
function in a test can optionally take. Therefore, this PR converts over the setUp function of
the quorum base class, plus every derived class, to take a TestInfo argument. The TestInfo
object gets "passed up the chain" to the base class, where it determines which quorum type we
create (ZK or KRaft). In a few cases, I discovered test cases inheriting from the test harness
that had more than one @BeforeEach function. Because the JUnit5 framework does not define the
order in which @BeforeEach hooks are run, I changed these to overload setUp() instead, to avoid
undefined behavior.

The general approach taken here is to make as much as possible work with KRaft, but to leave some
things as ZK-only when appropriate. For example, a test that explicitly requests an AdminZkClient
object will get an exception if it is running in KRaft mode. Similarly, tests which explicitly
request KafkaServer rather than KafkaBroker will get an exception when running in KRaft mode.
As a proof of concept, this PR converts over kafka.api.MetricsTest to support KRaft.

This PR also renames the quorum controller event handler thread to include the text
"QuorumControllerEventHandler". This allows QuorumTestHarness to check for hanging quorum
controller threads, as it does for hanging ZK-based controller threads.

Finally, ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
caused many failing test runs. Therefore, I disabled it here and filed KAFKA-13421 to fix the
test logic to be more reliable.

Reviewers: Jason Gustafson <jason@confluent.io>, Igor Soarez <soarez@apple.com>
2021-10-30 08:00:34 -07:00
Ryan Dielhenn 3f433c0b4a
KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to the QuorumController (#10772)
Reviewers: Colin P. McCabe <cmccabe@apache.org>
2021-10-22 12:57:36 -07:00
loboya~ 8a1d84b06a
MINOR: Fixed "snasphot" naming issue (#11203)
Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
2021-10-21 08:25:52 +02:00
Lucas Bradstreet da38a1df27
MINOR: "partition" typos and method doc arg fix (#11298)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Luke Chen <showuon@gmail.com>
2021-10-18 10:44:11 +02:00
José Armando García Sancio da58d75c43
MINOR: Fix highest offset when loading KRaft metadata snapshots (#11386)
When loading a snapshot the broker BrokerMetadataListener was using the batch's append time, offset
and epoch. These are not the same as the append time, offset and epoch from the log. This PR fixes
it to instead use the lastContainedLogTimeStamp, lastContainedLogOffset and lastContainedLogEpoch
from the SnapshotReader.

This PR refactors the MetadataImage and MetadataDelta to include an offset and epoch. It also swaps
the order of the arguments for ReplicaManager.applyDelta, in order to be more consistent with
MetadataPublisher.publish.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2021-10-12 17:19:03 -07:00
Colin Patrick McCabe 3f3a0e0d9e
KAFKA-13280: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds (#11311)
Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds and
KRaftMetadataCache#topicIdsToNames by returning a map subclass that
exposes the TopicsImage data structures without copying them.

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
2021-10-07 09:41:57 -07:00
Colin Patrick McCabe b741e9386e
KAFKA-13324: KRaft: fix validateOnly in CreateTopics (#11361)
Fix a bug where the validateOnly flag for createTopics was being ignored.

Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@gmail.com>, singingMan <3schwartz@users.noreply.github.com>
2021-09-27 16:00:41 -07:00
Colin Patrick McCabe af0c2a9370
MINOR: fix ClusterControlManager log message (#11358)
Fix a ClusterControlManager log message that should have distinguished between
newly registered and re-registered brokers, but did not due to a bug.

Reviewers: Ismael Juma <ismael@juma.me.uk>, José Armando García Sancio <jsancio@gmail.com>
2021-09-24 11:16:05 -07:00
Colin Patrick McCabe 85548acafb
KAFKA-13279: allow CreateTopicsPolicy, AlterConfigsPolicy in KRaft mode (#11310)
Add support for CreateTopicsPolicy and AlterConfigsPolicy when running in KRaft mode.

Reviewers: David Arthur <mumrah@gmail.com>, Niket Goel <ngoel@confluent.io>
2021-09-22 13:07:45 -07:00
Jason Gustafson 7de8a93c7e
KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft (#11186)
This patch fixes several problems with the `ElectLeaders` API in KRaft:

- `KafkaApis` did not properly forward this request type to the controller.
- `ControllerApis` did not handle the request type.
- `ElectLeadersRequest.getErrorResponse` may raise NPE when `TopicPartitions` is null.
- Controller should not do preferred election if `ElectLeaders` specifies `UNCLEAN` election.
- Controller should not do unclean election if `ElectLeaders` specifies `PREFERRED` election.
- Controller should use proper error codes to handle cases when desired leader is unavailable or when no election is needed because a desired leader is already elected.
- When election for all partitions is requested (indicated with null `TopicPartitions` field), the response should not return partitions for which no election was necessary.

In addition to extending the unit test coverage in `ReplicationControlManagerTest`, I have also converted `LeaderElectionCommandTest` to use KRaft.

Reviewers: dengziming <swzmdeng@163.com>, José Armando García Sancio <jsancio@users.noreply.github.com>, David Arthur <mumrah@gmail.com>
2021-09-15 08:52:45 -07:00
José Armando García Sancio 9bcf4a525b
KAFKA-13198: Stop replicas when reassigned (#11216)
Stop the replica and resign the coordinators when a replica gets reassigned away from a topic partition.

1. Implement localChanges in TopicsDelta and TopicDelta to return all of the partitions that were deleted, became leader and became follower for the given broker id.
2. Add tests for TopicsDelta::localChanges
3. Resign coordinators that were moved away from the consumer offset and transaction topic partitions.
4. Add replica manager tests for testing reassignment of replicas and removal of topic.
5. Add a new type LocalReplicaChanges that encapsulates topic partitions deleted, became leader and became follower.

Reviewers: Jun Rao <junrao@gmail.com>
2021-08-17 13:10:03 -07:00
Niket b1b872bd4e
KAFKA-13173; Ensure KRaft controller handles concurrent broker expirations correctly (#11191)
Prior to this patch, the controller did not accumulate ISR/leader changes correctly when multiple broker's sessions expired at the same time. This patch fixes the problem by having the controller handle one expiration at a time.

Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
2021-08-12 09:38:44 -07:00
José Armando García Sancio 0837fba997
KAFKA-13161; Update replica partition state and replica fetcher state on follower update (#11189)
When processing the topics delta, make sure that the replica manager partition state and replica fetcher state matches the information included in the topic delta. Also ensure that delayed operations are processed after the follower state change has been made since that is what allows them to be completed.

Reviewers: Jason Gustafson <jason@confluent.io>
2021-08-10 12:06:30 -07:00
dengziming b980ca8709
KAFKA-12158; Better return type of RaftClient.scheduleAppend (#10909)
This patch improves the return type for `scheduleAppend` and `scheduleAtomicAppend`. Previously we were using a `Long` value and using both `null` and `Long.MaxValue` to distinguish between different error cases. In this PR, we change the return type to `long` and only return a value if the append was accepted. For the error cases, we instead throw an exception. For this purpose, the patch introduces a couple new exception types: `BufferAllocationException` and `NotLeaderException`.

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
2021-08-02 14:47:03 -07:00
José Armando García Sancio 4efd9bf0cf
KAFKA-13114; Revert state and reregister raft listener (#11116)
RaftClient's scheduleAppend may split the list of records into multiple
batches. This means that it is possible for the active controller to
see a committed offset for which it doesn't have an in-memory snapshot.

If the active controller needs to renounce and it is missing an
in-memory snapshot, then revert the state and reregister the Raft
listener. This will cause the controller to replay the entire metadata
partition.

Reviewers: Jason Gustafson <jason@confluent.io>
2021-08-01 15:26:04 -07:00
Ron Dagostino 976409ee08
KAFKA-13137; KRaft Controller Metric MBean names incorrectly quoted (#11131)
Controller metric names that are in common between the ZooKeeper-based and KRaft-based controller must remain the same, but they were not in the AK 2.8 early access release of KRaft. For example, the non-KRaft MBean name `kafka.controller:type=KafkaController,name=OfflinePartitionsCount` incorrectly became `"kafka.controller":type="KafkaController",name="OfflinePartitionCount"` (note the added quotes and the lack of plural).  This patch fixes the issues, closes the test gap that allowed the divergence to occur, and adds de-registration logic to remove the metrics when the controller is closed (this logic was missing).

Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
2021-07-29 13:01:19 -07:00
José Armando García Sancio 55d9acad65
KAFKA-13113; Support unregistering Raft listeners (#11109)
This patch adds support for unregistering listeners to `RaftClient`. 

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
2021-07-23 21:54:44 -07:00
Ryan Dielhenn 56ef910358
KAFKA-13104: Controller should notify raft client when it resigns #11082
When the active controller encounters an event exception it attempts to renounce leadership.
Unfortunately, this doesn't tell the RaftClient that it should attempt to give up leadership. This
will result in inconsistent state with the RaftClient as leader but with the controller as
inactive.  This PR changes the implementation so that the active controller asks the RaftClient
to resign.

Reviewers: Jose Sancio <jsancio@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2021-07-20 16:41:20 -07:00
José Armando García Sancio 69a4661d7a
KAFKA-13100: Create KRaft controller snapshot during promotion (#11084)
The leader assumes that there is always an in-memory snapshot at the last
committed offset. This means that the controller needs to generate an in-memory
snapshot when getting promoted from inactive to active.  This PR adds that
code. This fixes a bug where sometimes we would try to look for that in-memory
snapshot and not find it.

The controller always starts inactive, and there is no requirement that there
exists an in-memory snapshot at the last committed offset when the controller
is inactive. Therefore we can remove the initial snapshot at offset -1.

We should also optimize when a snapshot is cancelled or completes, by deleting
all in-memory snapshots less that the last committed offset.

SnapshotRegistry's createSnapshot should allow the creating of a snapshot if
the last snapshot's offset is the given offset. This allows for simpler client
code. Finally, this PR renames createSnapshot to getOrCreateSnapshot.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2021-07-20 10:13:01 -07:00
Colin Patrick McCabe e07de97a4c
KAFKA-12803: Support reassigning partitions when in KRaft mode (#10753)
Support the KIP-455 reassignment API when in KRaft mode. Reassignments
which merely rearrange partitions complete immediately. Those that only
remove a partition complete immediately if the ISR would be non-empty
after the specified removals. Reassignments that add one or more
partitions follow the KIP-455 pattern of adding all the adding replicas
to the replica set, and then waiting for the ISR to include all the new
partitions before completing. Changes to the partition sets are
accomplished via PartitionChangeRecord.

Reviewers: Jun Rao <junrao@gmail.com>
2021-07-15 11:41:51 -07:00
Colin Patrick McCabe 301a07f4d8
KAFKA-13083: Fix KRaft ISR in createPartitions, createTopics
When creating a new topic or partition via a manual partition assignment, we
must check if the nodes are unfenced before adding them to the new ISR.
We also need to reject attempts to create a new partition with all fenced
nodes.

Reviewers: David Arthur <mumrah@gmail.com>
2021-07-14 10:52:25 -07:00
Jason Gustafson d20865ae79
KAFKA-13053; Bump kraft frame version for incompatible changes from 2.8 (#11010)
This patch bumps the default frame version for kraft records from 0 to 1. At the same time, we reset all
records versions back to 0 and we enable flexible version support for UnregisterBrokerRecord, which was
missed previously. Note that the frame version bump also affects the KIP-405 records since they are
sharing AbstractApiMessageSerde. Since these records were not part of any previous releases, this should
not cause an issue.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2021-07-09 12:18:34 -07:00
Colin Patrick McCabe 526fdfb97b
MINOR: the broker should use metadata.log.max.record.bytes.between.snapshots (#10990)
The broker should trigger a snapshot once
metadata.log.max.record.bytes.between.snapshots has been exceeded.

Reviewers: Jason Gustafson <jason@confluent.io>
2021-07-09 12:00:26 -07:00
David Arthur 03890ff1d1
MINOR: Fix NPE from addingReplicas and removingReplicas (#10992)
Fix NPE from addingReplicas and removingReplicas. Make addingReplicas and 
removingReplicas in PartitionRecord non-nullable as described in KIP-746.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2021-07-07 15:39:43 -07:00
Colin Patrick McCabe 7bd55f5156
KAFKA-12998: Implement broker-side KRaft snapshots (#10931)
This PR implements broker-side KRaft snapshots, including both saving and
loading. The code for triggering a periodic broker-side snapshot will come in a
follow-on PR. Loading should work with just this PR. It also implements
reloading broker snapshots after initialization.

In order to facilitate snapshots, this PR introduces the concept of
MetadataImage and MetadataDelta. MetadataImage represents the metadata state
retained in memory. It is basically a generalization of MetadataCache that
includes a few things that MetadataCache does not (such as features and client
quotas.) KRaftMetadataCache is now an accessor for the data stored in this object.
Similarly, MetadataImage replaces CacheConfigRespository and ClientQuotaCache.
It also subsumes kafka.server.metadata.MetadataImage and related classes.

MetadataDelta represents a change to a MetadataImage. When a KRaft snapshot is
loaded, we will accumulate all the changes into a MetadataDelta first, prior to
applying it. If we must reload a snapshot because we fell too far behind while
consuming metadata, the resulting MetadataDelta will contain all the changes
needed to catch us up. During normal operation, MetadataDelta is also used to
accumulate the changes of each incoming batch of metadata records. These
incremental deltas should be relatively small.

I have removed the logic for updating the various manager objects from
BrokerMetadataListener and placed it into BrokerMetadataPublisher. This makes
it easier to unit test BrokerMetadataListener.

Reviewers: David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
2021-07-06 16:36:01 -07:00
José Armando García Sancio 9f01909dc3
KAFKA-12997: Expose the append time for batches from raft (#10946)
Add the record append time to Batch. Change SnapshotReader to set this time to the
time of the last log in the last batch. Fix the QuorumController to remember the last
committed batch append time and to store it in the generated snapshot.

Reviewers: David Arthur <mumrah@gmail.com>, Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2021-07-01 16:38:59 -07:00
Colin Patrick McCabe b4e45cd0d2
KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots (#10949)
Create the image/ module for storing, reading, and writing broker metadata images.
Metadata images are immutable. New images are produced from existing images
using delta classes. Delta classes are mutable, and represent changes to a base
image.

MetadataImage objects can be converted to lists of KRaft metadata records. This
is essentially writing a KRaft snapshot. The resulting snapshot can be read
back into a MetadataDelta object. In practice, we will typically read the
snapshot, and then read a few more records to get fully up to date. After that,
the MetadataDelta can be converted to a MetadataImage as usual.

Sometimes, we have to load a snapshot even though we already have an existing
non-empty MetadataImage. We would do this if the broker fell too far behind and
needed to receive a snapshot to catch up. This is handled just like the normal
snapshot loading process. Anything that is not in the snapshot will be marked
as deleted in the MetadataDelta once finishSnapshot() is called.

In addition to being used for reading and writing snapshots, MetadataImage also
serves as a cache for broker information in memory. A follow-up PR will replace
MetadataCache, CachedConfigRepository, and the client quotas cache with the
corresponding Image classes. TopicsDelta also replaces the "deferred
partition" state that the RaftReplicaManager currently implements. (That change
is also in a follow-up PR.)

Reviewers: Jason Gustafson <jason@confluent.io>, David Arthur <mumrah@gmail.com>
2021-07-01 00:08:25 -07:00
José Armando García Sancio 1b7ab8eb9f
KAFKA-12863: Configure controller snapshot generation (#10812)
Add the ability for KRaft controllers to generate snapshots based on the number of new record bytes that have 
been applied since the last snapshot. Add a new configuration key to control this parameter. For now, it
defaults to being off, although we will change that in a follow-on PR. Also, fix LocalLogManager so that
snapshot loading is only triggered when the listener is not the leader.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2021-06-30 18:13:53 -07:00
Niket d3ec9f940c
KAFKA-12952 Add header and footer records for raft snapshots (#10899)
Add header and footer records for raft snapshots. This helps identify when the snapshot
starts and ends. The header also contains a time.  The time field is currently set to 0.
KAFKA-12997 will add in the necessary wiring to use the correct timestamp.

Reviewers: Jose Sancio <jsancio@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
2021-06-29 09:37:20 -07:00
José Armando García Sancio c333bfd417
MINOR: Add reset to SnapshotRegistry and Revertable (#10891)
Add reset functionality to SnapshotRegitry and Revertable, so that we can
clear the current state before loading a snapshot.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2021-06-18 17:08:54 -07:00
José Armando García Sancio 4ddf1a5f94
KAFKA-12837; Process entire batch reader in the `BrokerMetadataListener` commit handler (#10902)
We should process the entire batch in `BrokerMetadataListener` and make sure that `hasNext` is called before calling `next` on the iterator. The previous code worked because the raft client kept track of the position in the iterator, but it caused NoSuchElementException to be raised when the reader was empty (as might be the case with control records).

Reviewers: Jason Gustafson <jason@confluent.io>
2021-06-18 13:20:43 -07:00
Colin Patrick McCabe f0bb85cee3
KAFKA-12931: KIP-746: Revise KRaft Metadata Records (#10867)
Implement the metadata records changes described in KIP-746. Fix unit
tests as needed.

Reviewers: David Arthur <mumrah@gmail.com>
2021-06-15 16:39:18 -07:00
Colin Patrick McCabe 135de5801e
KAFKA-12877: Make flexibleVersions mandatory (#10804)
Many Kafka protocol JSON files were accidentally configured to not use
flexible versions, since it was not on by default.  This PR requires
JSON files to specify a flexibleVersions value. If the JSON file does
not specify the flexibleVersions value, display an error message
suggesting the correct value to use for new messages.

Reviewers: Jason Gustafson <jason@confluent.io>
2021-06-15 16:04:30 -07:00
José Armando García Sancio b67a77d5b9
KAFKA-12787; Integrate controller snapshoting with raft client (#10786)
Directly use `RaftClient.Listener`, `SnapshotWriter` and `SnapshotReader` in the quorum controller.

1. Allow `RaftClient` users to create snapshots by specifying the last committed offset and last committed epoch. These values are validated against the log and leader epoch cache.
2. Remove duplicate classes in the metadata module for writing and reading snapshots.
3. Changed the logic for comparing snapshots. The old logic was assuming a certain batch grouping. This didn't match the implementation of the snapshot writer. The snapshot writer is free to merge batches before writing them.
4. Improve `LocalLogManager` to keep track of multiple snapshots.
5. Improve the documentation and API for the snapshot classes to highlight the distinction between the offset of batches in the snapshot vs the offset of batches in the log. These two offsets are independent of one another. `SnapshotWriter` and `SnapshotReader` expose a method called `lastOffsetFromLog` which represents the last inclusive offset from the log that is represented in the snapshot.

Reviewers: dengziming <swzmdeng@163.com>, Jason Gustafson <jason@confluent.io>
2021-06-15 10:32:01 -07:00
Colin Patrick McCabe 5936a249bd
KAFKA-12934: Move some controller classes to the metadata package (#10865)
Move some controller classes to the metadata package so that they can be
used with broker snapshots. Rename ControllerTestUtils to
RecordTestUtils. Create LeaderConstants and PartitionRegistration.

Reviewers: Jason Gustafson <jason@confluent.io>
2021-06-10 17:34:15 -07:00
Ron Dagostino 43db8ac86a
KAFKA-12897: KRaft multi-partition placement on single broker (#10823)
#10494 introduced a bug in the KRaft controller where the controller will loop forever in StripedReplicaPlacer trying to identify the racks on which to place partition replicas if there is a single unfenced broker in the cluster and the number of requested partitions in a CREATE_TOPICS request is greater than 1.

This patch refactors out some argument sanity checks and invokes those checks in both RackList and StripedReplicaPlacer, and it adds tests for this as well as the single broker placement issue.

Reviewers: Jun Rao <junrao@gmail.com>
2021-06-07 14:13:06 -07:00
dengziming 51665b9f39
KAFKA-12338; Remove unused `MetadataParser` (#10793)
`MetadataParser` is a duplication of `MetadataRecordSerde` and it's not used in any code, so we can remove it. It did, however, have some useful validations which have been moved into `MetadataRecordSerde`.

Reviewers: Jason Gustafson <jason@confluent.io>
2021-06-07 10:52:16 -07:00
David Arthur e97cff2702
KAFKA-12620 Allocate Producer IDs in KRaft controller (#10752)
This is part 2 of KIP-730. Part 1 was in #10504.

This PR adds QuorumController support for handling AllocateProducerIDs requests
and managing the state of the latest producer ID block in the controller by committing
this state to the metadata log.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2021-06-03 16:23:32 -07:00
Colin Patrick McCabe ccec9b0c0d
KAFKA-12864: Move KafkaEventQueue into server-common. #10787 (#10787)
Since KafkaEventQueue is a generic data structure not specific to metadata, move it
into the server-common module.

Reviewers: Ismael Juma <ismael@juma.me.uk>, David Arthur <mumrah@gmail.com>
2021-06-02 16:10:59 -07:00
Ismael Juma e4b3a3cdeb
MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout` (#10759)
New parameters in overloaded methods should appear later apart from
lambdas that should always be last.
2021-05-27 06:25:00 -07:00
Colin P. Mccabe c714e67685 HOTFIX: fix checkstyle issue in KAFKA-12697 2021-05-20 16:39:52 -07:00
Ryan Dielhenn 80ec8fbcd5
KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller (#10572)
Reviewers: Colin P. McCabe <cmccabe@apache.org>
2021-05-20 16:28:32 -07:00
José Armando García Sancio f50f13d781
KAFKA-12342: Remove MetaLogShim and use RaftClient directly (#10705)
This patch removes the temporary shim layer we added to bridge the interface
differences between MetaLogManager and RaftClient. Instead, we now use the
RaftClient directly from the metadata module.  This also means that the
metadata gradle module now depends on raft, rather than the other way around.
Finally, this PR also consolidates the handleResign and handleNewLeader APIs
into a single handleLeaderChange API.

Co-authored-by: Jason Gustafson <jason@confluent.io>
2021-05-20 15:39:46 -07:00
Colin Patrick McCabe 9e5b77fb96
KAFKA-12788: improve KRaft replica placement (#10494)
Implement a striped replica placement algorithm for KRaft. This also
means implementing rack awareness.  Previously, KRraft just chose
replicas randomly in a non-rack-aware fashion.  Also, allow replicas to
be placed on fenced brokers if there are no other choices.  This was
specified in KIP-631 but previously not implemented.

Reviewers: Jun Rao <junrao@gmail.com>
2021-05-17 16:49:47 -07:00
Colin Patrick McCabe 0b9a7bd75e
KAFKA-12792: Fix metrics bug and introduce TimelineInteger (#10707)
Introduce a TimelineInteger class which represents a single integer
value which can be changed while maintaining snapshot consistency. Fix a
case where a metric value would be corrupted after a snapshot restore.

Reviewers: David Arthur <mumrah@gmail.com>
2021-05-17 10:21:26 -07:00
Colin Patrick McCabe f20fdbd839
KAFKA-12778: Fix QuorumController request timeouts and electLeaders (#10688)
The QuorumController should honor the timeout for RPC requests
which feature a timeout. For electLeaders, attempt to trigger a leader
election for all partitions when the request specifies null for the topics
argument.

Reviewers: David Arthur <mumrah@gmail.com>
2021-05-14 12:44:16 -07:00
Ryan Dielhenn e69571aecc
KAFKA-12697: Add Global Topic and Partition count metrics to the Quorum Controller (#10679)
Reviewers: Colin P. McCabe <cmccabe@apache.org>
2021-05-13 12:08:18 -07:00
Satish Duggana 7ef3879429
KAFKA-12758 Added `server-common` module to have server side common classes. (#10638)
Added server-common module to have server side common classes. Moved ApiMessageAndVersion, RecordSerde, AbstractApiMessageSerde, and BytesApiMessageSerde to server-common module.

Reivewers:  Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
2021-05-11 09:58:28 -07:00
Colin Patrick McCabe 8d38189edd
MINOR: clean up some replication code (#10564)
Centralize leader and ISR changes in generateLeaderAndIsrUpdates.
Consolidate handleNodeDeactivated and handleNodeActivated into this
function.

Rename BrokersToIsrs#noLeaderIterator to BrokersToIsrs#partitionsWithNoLeader.
Create BrokersToIsrs#partitionsLedByBroker, BrokersToIsrs#partitionsWithBrokerInIsr

In ReplicationControlManagerTest, createTestTopic should be a member
function of ReplicationControlTestContext.  It should invoke
ReplicationControlTestContext#replay so that records are applied to all
parts of the test context.

Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2021-04-29 11:20:30 -07:00
Colin Patrick McCabe 31b4ba8c5a
MINOR: fix some bugs in ControllerApis.scala (#10505)
Fix some cases where ControllerApis was blocking on the controller
thread.  This should not be necessary, since the controller thread can
just interface directly with the network threads.

Fix some cases where ControllerApis wasn't doing authorization
correctly.  Since the previous release of KRaft did not support
authorizers, this bug is not as severe as it could have been, but it
still needs to be fixed.  Add authorization unit tests for each API.

Add support for the deprecated ALTER_CONFIGS API in ControllerApis.  It
was already supported in QuorumController, but wasn't exposed
previously.

Fix how we validate duplicate config resources and unknown config
resource types in ControllerApis.  Duplicates should yield an
INVALID_REQUEST error, and unknown resource types should give an error
with the corresponding numerical resource type and UNSUPPORTED_VERSION.
Fix some redaction code in RequestChannel that was throwing an exception
when duplicate config resources were present in the request.

Fix a comment in ControllerApis#deleteTopics that no longer reflects
what the code is doing when we don't have "describe" permission.

Add function stubs for the KIP-455 reassignment APIs in ControllerApis
and QuorumController.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Arthur <mumrah@gmail.com>
2021-04-15 10:45:24 -07:00
Colin Patrick McCabe a8a6952e4a
KAFKA-12471: Implement createPartitions in KIP-500 mode (#10343)
Implement the createPartitions RPC which adds more partitions to a topic
in the KIP-500 controller.  Factor out some of the logic for validating
manual partition assignments, so that it can be shared between
createTopics and createPartitions.  Add a startPartition argument to the
replica placer.

Reviewers: Jason Gustafson <jason@confluent.io>
2021-04-13 11:00:22 -07:00
David Arthur 5964401bf9
KAFKA-12406 Integrate client quotas with KRaft broker (#10254) 2021-04-08 13:56:21 -04:00
Colin Patrick McCabe e28924062f
MINOR: move NoOpSnapshotWriter to main (#10496)
Move NoOpSnapshotWriter and NoOpSnapshotWriterBuilder out of the test
directory and into the main directory, until we implement the KRaft
integration.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
2021-04-06 22:29:26 -07:00
Colin Patrick McCabe 7bc84d6ced
KAFKA-12467: Implement QuorumController snapshot generation (#10366)
Implement controller-side snapshot generation.Implement QuorumController snapshot
generation.  Note that this PR does not handle KRaft integration, just the internal 
snapshot record generation and consumption logic.

Reading a snapshot is relatively straightforward.  When the  QuorumController
starts up, it loads the most recent snapshot.  This is just a series of records
that we replay, plus a log offset ("snapshot epoch") that we advance to.

Writing a snapshot is more complex.  There are several components:
the SnapshotWriter which persists the snapshot, the SnapshotGenerator
which manages writing each batch of records, and the SnapshotGeneratorManager
which interfaces the preceding two classes with the event queue.

Controller snapshots are done incrementally.  In order to avoid blocking the
controller thread for a long time, we pull a few record batches at a time from
our record batch iterators.  These iterators are implemented by controller
manager classes such as ReplicationControlManager, ClusterControlManager, etc.

Finally, this PR adds ControllerTestUtils#deepSortRecords and
ControllerTestUtils#assertBatchIteratorContains, which make it easier to write
unit tests.  Since records are often constructed from unsorted data structures,
it is often useful to sort them before comparing them.

Reviewers: David Arthur <mumrah@gmail.com>
2021-04-06 10:18:06 -07:00
dengziming 798672fa14
MONOR: Remove redudant LocalLogManager (#10325)
Remove an unused copy of LocalLogManager (a test class) that ended
up in the main code directory.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2021-04-01 18:37:10 -07:00
Ismael Juma 16b2d4f3a7
MINOR: Self-managed -> KRaft (Kafka Raft) (#10414)
`Self-managed` is also used in the context of Cloud vs on-prem and it can
be confusing.

`KRaft` is a cute combination of `Kafka Raft` and it's pronounced like `craft`
(as in `craftsmanship`).

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jose Sancio <jsancio@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Ron Dagostino <rdagostino@confluent.io>
2021-03-29 15:39:10 -07:00
dengziming e6f8ca80cd
MINOR: Fix log statement whose placeholders are inconsistent with arguments (#10312)
1. When the 2nd argument is an exception we don't need a placeholder
2. Placeholders should equal to arguments.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2021-03-22 13:39:05 +08:00
Ismael Juma 7c7e8078e4
MINOR: Use self-managed mode instead of KIP-500 and nozk (#10362)
KIP-500 is not particularly descriptive. I also tweaked the readme text a bit.

Tested that the readme for self-managed still works after these changes.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ron Dagostino <rdagostino@confluent.io>, Jason Gustafson <jason@confluent.io>
2021-03-19 16:42:37 -07:00
Ron Dagostino d9bb2ef596
MINOR: The new KIP-500 code should treat cluster ID as a string (#10357)
Cluster ID has traditionally been treated as a string by the Kafka protocol (for example,
AdminClient returns it as a string).  The new KIP-500 code should continue this practice.  If
we don't do this, upgrading existing clusters may be harder to do.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
2021-03-19 09:56:04 -07:00
Colin Patrick McCabe f6884cc6dd
MINOR: Fix BaseHashTable sizing (#10334)
The array backing BaseHashTable is intended to be sized as a power of
two.  Due to a bug, the initial array size was calculated incorrectly
in some cases.

Also make the maximum array size the largest possible 31-bit power of
two.  Previously it was a smaller size but this was due to a typo.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jose Sancio <jsancio@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2021-03-18 09:58:49 -07:00
dengziming 8c9bc9c640
MINOR: Add entityType for metadata record definitions (#10116)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2021-03-10 13:27:06 +08:00
Tom Bentley a1fb80ffe2
MINOR: Add missing log argument (#10262)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2021-03-09 12:46:43 +08:00
Jason Gustafson 31a121c3b7
KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received (#10252)
This patch implements additional handling logic for `RemoveTopic` records:

- Update `MetadataPartitions` to ensure addition of deleted partitions to `localRemoved` set
- Ensure topic configs are removed from `ConfigRepository`
- Propagate deleted partitions to `GroupCoordinator` so that corresponding offset commits can be removed

This patch also changes the controller topic id generation logic to use `Uuid.randomUuid` rather than `Random`.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
2021-03-08 11:21:42 -08:00
Jason Gustafson 60a097ae40
HOTFIX: Controller topic deletion should be atomic (#10264)
Topic deletions should be atomic. This fixes a build error caused by merging of both https://github.com/apache/kafka/pull/10253 and https://github.com/apache/kafka/pull/10184 at about the same time. 

Reviewers: David Arthur <mumrah@gmail.com>
2021-03-04 12:19:34 -08:00
Colin Patrick McCabe eebc6f279e
MINOR: Enable topic deletion in the KIP-500 controller (#10184)
This patch enables delete topic support for the new KIP-500 controller. Also fixes the following:
- Fix a bug where feature level records were not correctly replayed.
- Fix a bug in TimelineHashMap#remove where the wrong type was being returned.

Reviewers: Jason Gustafson <jason@confluent.io>, Justine Olshan <jolshan@confluent.io>, Ron Dagostino <rdagostino@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>

Co-authored-by: Jason Gustafson <jason@confluent.io>
2021-03-04 11:28:20 -08:00
José Armando García Sancio 96a2b7aac4
KAFKA-12376: Apply atomic append to the log (#10253) 2021-03-04 13:55:43 -05:00
Jason Gustafson a8eda3c785
KAFKA-12367; Ensure partition epoch is propagated to `Partition` state (#10200)
This patch fixes two problem with the AlterIsr handling of the quorum controller:

- Ensure that partition epoch is updated correctly after partition change records and is
propagated to Partition

- Ensure that AlterIsr response includes partitions that were successfully updated

As part of this patch, I've renamed BrokersToIsrs.TopicPartition to
BrokersToIsrs.TopicIdPartition to avoid confusion with the TopicPartition object which is
used virtually everywhere. I've attempted to address some of the testing gaps as welll.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
2021-02-25 14:17:19 -08:00
Colin Patrick McCabe 5eac5a822f
KAFKA-12276: Add the quorum controller code (#10070)
The quorum controller stores metadata in the KIP-500 metadata log, not in Apache
ZooKeeper. Each controller node is a voter in the metadata quorum. The leader of the
quorum is the active controller, which processes write requests. The followers are standby
controllers, which replay the operations written to the log. If the active controller goes away,
a standby controller can take its place.

Like the ZooKeeper-based controller, the quorum controller is based on an event queue
backed by a single-threaded executor. However, unlike the ZK-based controller, the quorum
controller can have multiple operations in flight-- it does not need to wait for one operation
to be finished before starting another. Therefore, calls into the QuorumController return
CompleteableFuture objects which are completed with either a result or an error when the
operation is done. The QuorumController will also time out operations that have been
sitting on the queue too long without being processed. In this case, the future is completed
with a TimeoutException.

The controller uses timeline data structures to store multiple "versions" of its in-memory 
state simultaneously. "Read operations" read only committed state, which is slightly older
than the most up-to-date in-memory state. "Write operations" read and write the latest
in-memory state. However, we can not return a successful result for a write operation until
its state has been committed to the log. Therefore, if a client receives an RPC response, it
knows that the requested operation has been performed, and can not be undone by a
controller failover.

Reviewers: Jun Rao <junrao@gmail.com>, Ron Dagostino <rdagostino@confluent.io>
2021-02-19 18:03:23 -08:00
Colin P. Mccabe 690f72dd69 KAFKA-12334: Add the KIP-500 metadata shell
The Kafka Metadata shell is a new command which allows users to
interactively examine the metadata stored in a KIP-500 cluster.
It can examine snapshot files that are specified via --snapshot.

The metadata tool works by replaying the log and storing the state into
in-memory nodes.  These nodes are presented in a fashion similar to
filesystem directories.

Reviewers: Jason Gustafson <jason@confluent.io>, David Arthur <mumrah@gmail.com>, Igor Soarez <soarez@apple.com>
2021-02-19 15:46:34 -08:00
David Arthur 25555b89f5
MINOR: Add BrokerMetadataListener (#10111)
This adds BrokerMetadataListener which is responsible for processing metadata records received by the broker when running in Raft mode.

This also moves some classes that were added to the wrong folder in trunk

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ron Dagostino <rdagostino@confluent.io>
2021-02-11 12:43:21 -08:00
Colin Patrick McCabe bf5e1f1cc0
MINOR: add the MetaLogListener, LocalLogManager, and Controller interface. (#10106)
Add MetaLogListener, LocalLogManager, and related classes. These
classes are used by the KIP-500 controller and broker to interface with the
Raft log.

Also add the Controller interface. The implementation will be added in a separate PR.

Reviewers: Ron Dagostino <rdagostino@confluent.io>, David Arthur <mumrah@gmail.com>
2021-02-11 08:42:59 -08:00
Colin Patrick McCabe d98df7fc4d
MINOR: Add KafkaEventQueue (#10030)
Add KafkaEventQueue, which is used by the KIP-500 controller to manage its event queue.
Compared to using an Executor, KafkaEventQueue has the following advantages:

* Events can be given "deadlines." If an event lingers in the queue beyond the deadline, it
will be completed with a timeout exception. This is useful for implementing timeouts for
controller RPCs.

* Events can be prepended to the queue as well as appended.

* Events can be given tags to make them easier to manage. This is especially useful for
rescheduling or cancelling events which were previously scheduled to execute in the future.

Reviewers: Jun Rao <junrao@gmail.com>, José Armando García Sancio <jsancio@gmail.com>
2021-02-04 14:46:57 -08:00
Jason Gustafson f58c2acf26
KAFKA-12250; Add metadata record serde for KIP-631 (#9998)
This patch adds a `RecordSerde` implementation for the metadata record format expected by KIP-631. 

Reviewers: Colin McCabe <cmccabe@apache.org>, Ismael Juma <mlists@juma.me.uk>
2021-02-03 16:16:35 -08:00
Colin Patrick McCabe 1711cfa4eb
KAFKA-12209: Add the timeline data structures for the KIP-631 controller (#9901)
Reviewers: Jun Rao <junrao@gmail.com>
2021-02-02 11:33:55 -08:00
Colin Patrick McCabe 217334b0f4
KAFKA-12183: Add the KIP-631 metadata record definitions (#9876)
Add the metadata gradle module, which will contain the metadata record
definitions, and other metadata-related broker-side code.

Add MetadataParser, MetadataParseException, etc.

Reviewers: José Armando García Sancio <jsancio@gmail.com>, Ismael Juma <ismael@juma.me.uk>, David Arthur <mumrah@gmail.com>
2021-01-14 09:58:52 -08:00