As part of KIP-835, LastCommittedRecordOffset was added to the
KafkaController metric type. Make sure to test that metric.
Reviewers: Jason Gustafson <jason@confluent.io>
Updates relevant tests in `ReassignPartitionsIntegrationTest` for KRaft. We skip JBOD tests since it is not supported and we skip `AlterPartition` upgrade tests since they are not relevant.
Reviewers: Kvicii <Karonazaba@gmail.com>, David Arthur <mumrah@gmail.com>
Fixes the timeouts we have been seeing in `LogOffsetTest` when KRaft is enabled. The problem is the dependence on `MockTime`. In the KRaft broker, we need a steadily advancing clock for events in `KafkaEventQueue` to get executed. In the case of the timeouts, the broker was stuck with the next heartbeat event in the queue. We depended on the execution of this event in order to send the next heartbeat and complete the `initialCatchUpFuture` and finish broker startup. This caused the test to get stuck during initialization, which is probably why the `@Timeout` wasn't working. The patch fixes the problem by using system time instead because there was not a strong dependence on `MockTime`.
Reviewers: David Arthur <mumrah@gmail.com>
This patch removes the --metadata-version and adds a --release-version to the kafka-storage tool. This change is not a breaking change since we are removing --metadata-version which was introduced on May 18, but it has not been released yet.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, dengziming <dengziming1993@gmail.com>
This PR implements the first part of KIP-841. Specifically, it implements the following:
1. Adds a new metadata version.
2. Adds the InControlledShutdown field to the BrokerRegistrationRecord and BrokerRegistrationChangeRecord and bump their versions. The newest versions are only used if the new metadata version is enabled.
3. Writes a BrokerRegistrationChangeRecord with InControlledShutdown set when a broker requests a controlled shutdown.
4. Ensures that fenced and in controlled shutdown replicas are not picked as leaders nor included in the ISR.
5. Adds or extends unit tests.
Reviewes: José Armando García Sancio <jsancio@users.noreply.github.com>, dengziming <dengziming1993@gmail.com>, David Arthur <mumrah@gmail.com>
Fixes several race conditions in the test case causing the flaky failure.
Reviewers: Divij Vaidya <divijvaidya13@gmail.com>, Jason Gustafson <jason@confluent.io>
Co-authored-by: Kvicii <Karonazaba@gmail.com>
The active quorum controller will append NoOpRecord periodically to increase metadata LEO, however, when a broker startup, we will wait until its metadata LEO catches up with the controller LEO, we generate NoOpRecord every 500ms and send heartbeat request every 2000ms.
It's almost impossible for a broker to catch up with the controller LEO if the broker sends a query request every 2000ms but the controller LEO increases every 500ms, so the tests in KRaftClusterTest will fail.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, showuon <43372967+showuon@users.noreply.github.com>, Colin Patrick McCabe <cmccabe@confluent.io>
This PR refactors the leader API access in the follower fetch path.
Added a LeaderEndPoint interface which serves all access to the leader.
Added a LocalLeaderEndPoint and a RemoteLeaderEndPoint which implements the LeaderEndPoint interface to handle fetches from leader in local & remote storage respectively.
Reviewers: David Jacot <djacot@confluent.io>, Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
When logManager startup and loadLogs, we expect to catch any IOException (ex: out of space error) and turn the log dir into offline. Later, we'll handle the offline logDir in ReplicaManage, so that the cleanShutdown file won't be created when all logDirs are offline. The reason why the broker shutdown with cleanShutdown file after full disk is because during loadLogs and do log recovery, we'll write leader-epoch-checkpoint fil. And if any IOException thrown, we'll wrap it as KafkaStorageException and rethrow. And since we don't catch KafkaStorageException, so the exception is caught in the other place and go with clean shutdown path.
This PR is to fix the issue by catching the KafkaStorageException with IOException cause exceptions during loadLogs, and mark the logDir as offline to let the ReplicaManager handle the offline logDirs.
Reviewers: Jun Rao <jun@confluent.io>, Alok Thatikunta <alok123thatikunta@gmail.com>
Enable kraft support in BaseQuotaTest and its extensions.
Reviewers: Kvicii <42023367+Kvicii@users.noreply.github.com>, dengziming <dengziming1993@gmail.com>
This PR fixes a bug where FeatureControlManager#replay(FeatureLevelRecord) was throwing an
exception if not all controllers in the quorum supported the feature being applied. While we do
want to validate this, it needs to be validated earlier, before the record is committed to the log.
Once the record has been committed to the log it should always be applied if the current controller
supports it.
Fix another bug where removing a feature was not supported once it had been configured. Note that
because we reserve feature level 0 for "feature not enabled", we don't need to use
Optional<VersionRange>; we can just return a range of 0-0 when the feature is not supported.
Allow the metadata version to be downgraded when UpgradeType.UNSAFE_DOWNGRADE has been set.
Previously we were unconditionally denying this even when this was set.
Add a builder for FeatureControlManager, so that we can easily add new parameters to the
constructor in the future. This will also be useful for creating FeatureControlManagers that are
initialized to a specific MetadataVersion.
Get rid of RemoveFeatureLevelRecord, since it's easier to just issue a FeatureLevelRecord with
the level set to 0.
Set metadata.max.idle.interval.ms to 0 in RaftClusterSnapshotTest for more predictability.
Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
The kafka-dump-log command should accept files with a suffix of ".checkpoint". It should also decode and print using JSON the snapshot header and footer control records.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
Implement NoOpRecord as described in KIP-835. This is controlled by the new
metadata.max.idle.interval.ms configuration.
The KRaft controller schedules an event to write NoOpRecord to the metadata log if the metadata
version supports this feature. This event is scheduled at the interval defined in
metadata.max.idle.interval.ms. Brokers and controllers were improved to ignore the NoOpRecord when
replaying the metadata log.
This PR also addsffour new metrics to the KafkaController metric group, as described KIP-835.
Finally, there are some small fixes to leader recovery. This PR fixes a bug where metadata version
3.3-IV1 was not marked as changing the metadata. It also changes the ReplicaControlManager to
accept a metadata version supplier to determine if the leader recovery state is supported.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Added parameter `metadataDirectory` to `setMetadataDirectory()` so that `this.metadataDirectory` would not be set to itself.
Reviewers: Kvicii <42023367+Kvicii@users.noreply.github.com>, dengziming <dengziming1993@gmail.com>, Jason Gustafson <jason@confluent.io>
The test cases we have in `RequestChannelTest` for `buildResponseSend` construct the envelope request incorrectly. The request is created using the envelope context, but also a reference to the wrapped envelope request object. This patch fixes `TestUtils.buildEnvelopeRequest` so that the wrapped request is built properly. It also fixes the dependence on this incorrect construction and consolidates the tests in `RequestChannelTest` to avoid duplication.
Reviewers: dengziming <dengziming1993@gmail.com>, David Jacot <djacot@confluent.io>
Fix some bugs in the KRaft unregisterBroker API and add a junit test.
1. kafka-cluster-tool.sh unregister should fail if no broker ID is passed.
2. UnregisterBrokerRequest must be marked as a KRaft broker API so
that KRaft brokers can receive it.
3. KafkaApis.scala must forward UNREGISTER_BROKER to the controller.
Reviewers: Jason Gustafson <jason@confluent.io>, dengziming <dengziming1993@gmail.com>, David Jacot <djacot@confluent.io>
When the kraft broker begins controlled shutdown, it immediately disables the metadata listener. This means that metadata changes as part of the controlled shutdown do not get sent to the respective components. For partitions that the broker is follower of, that is what we want. It prevents the follower from being able to rejoin the ISR while still shutting down. But for partitions that the broker is leading, it means the leader will remain active until controlled shutdown finishes and the socket server is stopped. That delay can be as much as 5 seconds and probably even worse.
This PR revises the controlled shutdown procedure as follow:
* The broker signals to the replica manager that it is about to start the controlled shutdown.
* The broker requests a controlled shutdown to the controller.
* The controller moves leaders off from the broker, removes the broker from any ISR that it is a member of, and writes those changes to the metadata log.
* When the broker receives a partition metadata change, it looks if it is in the ISR. If it is, it updates the partition as usual. If it is not or if there is no leader defined--as would be the case if the broker was the last member of the ISR--it stops the fetcher/replica. This basically stops all the partitions for which the broker was part of their ISR.
When the broker is a replica of a partition but it is not in the ISR, the controller does not do anything. The leader epoch is not bumped. In this particular case, the follower will continue to run until the replica manager shuts down. In this time, the replica could become in-sync and the leader could try to bring it back to the ISR. This remaining issue will be addressed separately.
Reviewers: Jason Gustafson <jason@confluent.io>
Flaky test as failed in CI https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-12184/1/tests/
The test fails because it does not wait for metadata to be propagated across brokers before killing a broker which may lead to it getting stale information. Note that a similar test was done in #12104 for a different test.
Reviewers: Kvicii Y, Ziming Deng, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This patch ensures consistent handling of null-valued topic configs between the zk and kraft controller. Prior to this patch, we returned INVALID_REQUEST in zk mode and it was not an error in kraft. After this patch, we return INVALID_CONFIG consistently for this case.
Reviewers: Jason Gustafson <jason@confluent.io>
When a partition leader receives a `Fetch` request from a replica which is not in the current replica set, the behavior today is to return a successful fetch response, but with empty data. This causes the follower to retry until metadata converges without updating any state on the leader side. It is clearer in this case to return an error, so that the metadata inconsistency is visible in logging and so that the follower backs off before retrying.
In this patch, we use `UNKNOWN_LEADER_EPOCH` when the `Fetch` request includes the current leader epoch. The way we see this is that the leader is validating the (replicaId, leaderEpoch) tuple. When the leader returns `UNKNOWN_LEADER_EPOCH`, it means that the leader does not expect the given leaderEpoch from that replica. If the request does not include a leader epoch, then we use `NOT_LEADER_OR_FOLLOWER`. We can take a similar interpretation for this case: the leader is rejecting the request because it does not think it should be the leader for that replica. But mainly these errors ensure that the follower will retry the request.
As a part of this patch, I have refactored the way that the leader updates follower fetch state. Previously, the process is a little convoluted. We send the fetch from `ReplicaManager` down to `Partition.readRecords`, then we iterate over the results and call `Partition.updateFollowerFetchState`. It is more straightforward to update state directly as a part of `readRecords`. All we need to do is pass through the `FetchParams`. This also prevents an unnecessary copy of the read results.
Reviewers: David Jacot <djacot@confluent.io>
This patch builds on #12072 and adds controller support for metadata.version. The kafka-storage tool now allows a
user to specify a specific metadata.version to bootstrap into the cluster, otherwise the latest version is used.
Upon the first leader election of the KRaft quroum, this initial metadata.version is written into the metadata log. When
writing snapshots, a FeatureLevelRecord for metadata.version will be written out ahead of other records so we can
decode things at the correct version level.
This also includes additional validation in the controller when setting feature levels. It will now check that a given
metadata.version is supportable by the quroum, not just the brokers.
Reviewers: José Armando García Sancio <jsancio@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>, Alyssa Huang <ahuang@confluent.io>
Replace the "RequestsPerSec" literal value with the pre-existing constant `RequestsPerSec`.
Reviewers: Divij Vaidya <divijvaidya13@gmail.com>, Jason Gustafson <jason@confluent.io>
Enable KRaft in `AdminClientWithPoliciesIntegrationTes`t and `PlaintextAdminIntegrationTest`. There are some tests not enabled or not as expected yet:
- testNullConfigs, see KAFKA-13863
- testDescribeCluster and testMetadataRefresh, currently we don't get the real controller in KRaft mode so the test may not run as expected
This patch also changes the exception type raised from invalid `IncrementalAlterConfig` requests with the `SUBTRACT` and `APPEND` operations. When the configuration value type is not a list, we now raise `INVALID_CONFIG` instead of `INVALID_REQUEST`.
Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
This patch ensures that followers don't have any remote replica states left over from previous leadership.
Reviewers: Jason Gustafson <jason@confluent.io>
The following variables in `KafkaController` are used for metrics:
```
offlinePartitionCount
preferredReplicaImbalanceCount
globalTopicCount
globalPartitionCount
topicsToDeleteCount
replicasToDeleteCount
ineligibleTopicsToDeleteCount
ineligibleReplicasToDeleteCount
```
When the controller goes from active to non-active, these variables will be reset to 0. Currently, this is done explicitly in in `KafkaController.onControllerResignation()` and also after every loop iteration in `KafkaController.updateMetrics()` .
The first of these is redundant and can be removed. This patch fixes this and also simplifies `updateMetrics`.
Reviewers: Jason Gustafson <jason@confluent.io>
In the AlterConfigs/IncrementalAlterConfigs zk handler, we return `INVALID_REQUEST` and `INVALID_CONFIG` inconsistently. The problem is in `LogConfig.validate`. We may either return `ConfigException` or `InvalidConfigException`. When the first of these is thrown, we catch it and convert to `INVALID_REQUEST`. If the latter is thrown, then we return `INVALID_CONFIG`. It seems more appropriate to return `INVALID_CONFIG` consistently, which is what the KRaft implementation already does this. This patch fixes this and converts a few integration tests to KRaft.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
The documentation for `num.replica.fetchers` should emphasize the fact that the count applies to each source broker individually. Also mention the tradeoff.
Reviewers: Jason Gustafson <jason@confluent.io>
Convert EndToEndClusterIdTest, ConsumerGroupCommandTest,
ListConsumerGroupTest, and LogOffsetTest to test KRaft mode.
Reviewers: Jason Gustafson <jason@confluent.io>, dengziming <dengziming1993@gmail.com>
Since the StandardAuthorizer relies on the metadata log to store its ACLs, we need to be sure that
we have the latest metadata before allowing the authorizer to be used. However, if the authorizer
is not usable for controllers in the cluster, the latest metadata cannot be fetched, because
inter-node communication cannot occur. In the initial commit which introduced StandardAuthorizer,
we punted on the loading issue by allowing the authorizer to be used immediately. This commit fixes
that by implementing early.start.listeners as specified in KIP-801. This will allow in superusers
immediately, but throw the new AuthorizerNotReadyException if non-superusers try to use the
authorizer before StandardAuthorizer#completeInitialLoad is called.
For the broker, we call StandardAuthorizer#completeInitialLoad immediately after metadata catch-up
is complete, right before unfencing. For the controller, we call
StandardAuthorizer#completeInitialLoad when the node has caught up to the high water mark of the
cluster metadata partition.
This PR refactors the SocketServer so that it creates the configured acceptors and processors in
its constructor, rather than requiring a call to SocketServer#startup A new function,
SocketServer#enableRequestProcessing, then starts the threads and begins listening on the
configured ports. enableRequestProcessing uses an async model: we will start the acceptor and
processors associated with an endpoint as soon as that endpoint's authorizer future is completed.
Also fix a bug where the controller and listener were sharing an Authorizer when in co-located
mode, which was not intended.
Reviewers: Jason Gustafson <jason@confluent.io>
Add a DeleteTopics test for all supported versions. Convert the
DeleteTopicsRequestTest to run against both ZK and KRaft mode.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>
Currently, we are waiting for metadataCache to bookkeeper the partition info, this isn't enough, we should wait until the partition ISR is less than AR.
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <divijvaidya13@gmail.com>
This patch adds a new case class `FetchParams` which encapsulates the parameters of the fetch request. It then uses this class in `DelayedFetch` directly instead of `FetchMetadata`. The intent is to reduce the number of things we need to change whenever we need to pass through new parameters. The patch also cleans up `ReplicaManagerTest` for more consistent usage.
Reviewers: David Jacot <djacot@confluent.io>
We can append/subtract multiple config values in kraft mode using the `IncrementalAlterConfig` RPC. For example: append/subtract topic config "cleanup.policy" with value="delete,compact" will end up treating "delete,compact" as a value not 2 values. This patch fixes the problem. Additionally, it update the zk logic to correctly handle duplicate additions.
Reviewers: Akhilesh Chaganti <akhileshchg@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
This patch refactors the `Partition.makeLeader` and `Partition.makeFollower` to be robust to all partition updates from the KRaft metadata log. Particularly, it ensures the following invariants:
- A partition update is accepted if the partition epoch is equal or newer. The partition epoch is updated by the AlterPartition path as well so we accept an update from the metadata log with the same partition epoch in order to fully update the partition state.
- The leader epoch state offset is only updated when the leader epoch is bumped.
- The follower states are only updated when the leader epoch is bumped.
- Fetchers are only restarted when the leader epoch is bumped. This was already the case but this patch adds unit tests to prove/maintain it.
In the mean time, the patch unifies the state change logs to be similar in both ZK and KRaft world.
Reviewers: Jason Gustafson <jason@confluent.io>
This PR tries to avoid the reinitialization of the leader epoch cache
and the partition metadata if the corresponding replica is being deleted.
With this change, the asyncDelete method can run more efficiently,
which means a StopReplica request with many partitions to be deleted can be
processed more quickly.
Reviewers: David Jacot <djacot@confluent.io>, Jun Rao <junrao@gmail.com>
The KRaft implementation of the `CreatePartitions` ignores the `validateOnly` flag in the
request and creates the partitions if the validations are successful. Fixed the behavior
not to create partitions upon validation if the `validateOnly` flag is true.
Reviewers: Divij Vaidya <divijvaidya13@gmail.com>, dengziming <dengziming1993@gmail.com>, Jason Gustafson <jason@confluent.io>
These tests belongs to ApiVersionsResponseTest, and accidentally copied them to MetadataVersionTest when working on #12072.
Reviewer: Luke Chen <showuon@gmail.com>
Refactoring ApiVersion to MetadataVersion to support both old IBP versioning and new KRaft versioning (feature flags)
for KIP-778.
IBP versions are now encoded as enum constants and explicitly prefixed w/ IBP_ instead of KAFKA_, and having a
LegacyApiVersion vs DefaultApiVersion was not necessary and replaced with appropriate parsing rules for extracting
the correct shortVersions/versions.
Co-authored-by: David Arthur <mumrah@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>, David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
Fix ClientQuotasRequestTest.testAlterClientQuotasBadIp so that it uses actually unresolvable hostnames.
The previous choices "ip" and "abc-123" are now resolvable.
Reviewers: David Jacot <djacot@confluent.io>, Andrew Choi <andrew.choi@uwaterloo.ca>, Divij Vaidya <divijvaidya13@gmail.com>
Since we have changed the `AlterIsr` API to `AlterPartition`, it makes sense to rename `AlterIsrManager` as well and some of the associated classes.
Reviewers: dengziming <dengziming1993@gmail.com>, David Jacot <djacot@confluent.io>
Adding KRaft and ZK params to ConfigCommandIntegrationTest wherever appropriate.
Reviewers: Kvicii <42023367+Kvicii@users.noreply.github.com>, dengziming <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
This patch refactors kafka.cluster.Replica, it usages and tests. This is part of the work in KAFKA-13790.
Reviewers: Jason Gustafson <jason@confluent.io>
Currently we validate recovery state before checking leader epoch in `KafkaController`. It seems more intuitive to validate leader epoch first since the leader might be working with stale state, which is what we do in KRaft. This patch fixes this and adds a couple additional validations to make the behavior consistent.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
The wildcard * in command without wrapped by single quote will be replaced into the file name under the current folder by bash. So we need to wrap with single quote. Update the doc and command option description.
Reviewers: dengziming <dengziming1993@gmail.com>, Luke Chen <showuon@gmail.com>
This patch does some initial cleanups in the context of KAFKA-13790. Mainly, it renames `ZkVersion` field to `PartitionEpoch` in the `LeaderAndIsrRequest`, the `LeaderAndIsr` and the `Partition`.
Reviewers: Jason Gustafson <jason@confluent.io>, dengziming <dengziming1993@gmail.com>
Fix two bugs related to dynamic broker configs in KRaft. The first bug is that we are calling reloadUpdatedFilesWithoutConfigChange when a topic configuration is changed, but not when a
broker configuration is changed. This is backwards. This function must be called only for broker
configs, and never for topic configs or cluster configs.
The second bug is that there were several configurations such as max.connections which are related
to broker listeners, but which do not involve changing the registered listeners. We should support
these configurations in KRaft. This PR fixes the configuration change validation to support this case.
Reviewers: Jason Gustafson <jason@confluent.io>, Matthew de Detrich <mdedetrich@gmail.com>
The reason why this test is flaky is because we have race condition at the beginning of the test, when brokers are staring up, and the adminClient is requesting for brokers metadata. Once the adminClient only got partial metadata, the test will fail, because in these tests, brokers will be shutdown to test leader election.
Fix this issue by explicitly waiting for metadata cache up-to-date in waitForReadyBrokers, and let admin client get created after waitForReadyBrokers.
Reviewers: Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>, dengziming <dengziming1993@gmail.com>
Ensure that we can set log.flush.interval.ms at the broker or cluster level via
IncrementalAlterConfigs. This was broken by KAFKA-13749, which added log.flush.interval.ms as the
second synonym rather than the first. Add a regression test to DynamicConfigChangeTest.
Create ControllerRequestContext and pass it to every controller API. This gives us a uniform way to
pass through information like the deadline (if there is one) and the Kafka principal which is
making the request (in the future we will want to log this information).
In ControllerApis, enforce a timeout for broker heartbeat requests which is equal to the heartbeat
request interval, to avoid heartbeats piling up on the controller queue. This should have been done
previously, but we overlooked it.
Add a builder for ClusterControlManager and ReplicationControlManager to avoid the need to deal
with a lot of churn (especially in test code) whenever a new constructor parameter gets added for
one of these.
In ControllerConfigurationValidator, create a separate function for when we just want to validate
that a ConfigResource is a valid target for DescribeConfigs. Previously we had been re-using the
validation code for IncrementalAlterConfigs, but this was messy.
Split out the replica placement code into a separate package and reorganize it a bit.
Reviewers: David Arthur <mumrah@gmail.com
This PR includes the changes to feature flags that were outlined in KIP-778. Specifically, it
changes UpdateFeatures and FeatureLevelRecord to remove the maximum version level. It also adds
dry-run to the RPC so the controller can actually attempt the upgrade (rather than the client). It
introduces an upgrade type enum, which supersedes the allowDowngrade boolean. Because
FeatureLevelRecord was unused previously, we do not need to introduce a new version.
The kafka-features.sh tool was overhauled in KIP-778 and now includes the describe, upgrade,
downgrade, and disable sub-commands. Refer to
[KIP-778](https://cwiki.apache.org/confluence/display/KAFKA/KIP-778%3A+KRaft+Upgrades) for more
details on the new command structure.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>
In ZK mode, the topic "foo_bar" will conflict with "foo.bar" because of limitations in metric
names. We should implement this in KRaft mode. This PR also changes TopicCommandIntegrationTest to
support KRaft mode.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Verify that ReplicaManager.stopReplica is called if the stop replica
request doesn't result in a stale broker epoch error.
Reviewers: Mickael Maison <mimaison@users.noreply.github.com>
Because a validator is added to ProducerConfig.COMPRESSION_TYPE_CONFIG and KafkaConfig.CompressionTypeProp, the corresponding testCase is improved to verify whether the wrong value of compression.type will throw a ConfigException.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
When migrating from Easymock to Mockito, the mockito implemetnation
didn't have the same semantic as the Easymock implementation.
Without this fix the mocking of LogSegment.size() always returns 0 because
a new AtomicInteger was getting created for each invocation of
LogSegment.size()
Reviewers: Mickael Maison <mimaison@users.noreply.github.com>
With KAFKA-13527 / KIP-784 we introduced a new top-level error code for
the DescribeLogDirs API for versions 3 and above. However, the change
regressed the error handling for versions less than 3 since the response
converter fails to write the non-zero error code out (rightly) for
versions lower than 3 and drops the response to the client which
eventually times out instead of receiving an empty log dirs response and
processing that as a Cluster Auth failure.
With this change, the API conditionally propagates the error code out to
the client if the request API version is 3 and above. This keeps the
semantics of the error handling the same for all versions and restores
the behavior for older versions.
See current behavior in the broker log:
```bash
ERROR] 2022-04-08 01:22:56,406 [data-plane-kafka-request-handler-10] kafka.server.KafkaApis - [KafkaApi-0] Unexpected error handling request RequestHeader(apiKey=DESCRIBE_LOG_DIRS, apiVersion=0, clientId=sarama, correlationId=1) -- DescribeLogDirsRequestData(topics=null)
org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default errorCode at version 0
[ERROR] 2022-04-08 01:22:56,407 [data-plane-kafka-request-handler-10] kafka.server.KafkaRequestHandler - [Kafka Request Handler 10 on Broker 0], Exception when handling request
org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default errorCode at version 0
```
Reviewers: Ismael Juma <ismael@juma.me.uk>
MetricsReporter.contextChange contract states the method should always be called first before MetricsReporter.init is called. This is done correctly for reporters enabled by default (e.g. JmxReporter) but not for metrics reporters configured dynamically.
This fixes the call ordering for dynamically configured metrics reporter and updates tests to enforce ordering.
Reviewers: David Jacot <djacot@confluent.io>
This PR allows to limit the output batches while they are inspected via the kafka-dump-log.sh script.
The idea is to take samples from the logsegments without affecting a production cluster as the current script will read the whole files, this could create issues related to performance.
Please see the KIP-824
Reviewers: Jun Rao <junrao@gmail.com>
The test expects that the connection becomes idle before the mock time is moved forward, but the processor thread runs concurrently and may run some activity on the connection after the mock time is moved forward, thus the connection never expires.
The solution is to wait until the message is received on the socket, and only then wait until the connection is unmuted (it's not enough to wait for unmuted without waiting for message being received on the socket, because the channel might have not been muted yet).
Reviewers: David Jacot <djacot@confluent.io>
The current preferred read replica selection logic relies on `partition.leaderReplicaIdOpt` to determine if the selection must be run. The issue is that `partition.leaderReplicaIdOpt` is defined for both the leader and the followers thus the logic is ran all the time. The impact is not too bad as the leader is selected most of the time when the logic is ran by the follower and the leader is filtered out. However there are cases where the selection on a follower could redirect the consumer to another follower under certain rare conditions. For instance with the `RackAwareReplicaSelector `, the follower must have stale replica states from a previous leadership and must have other followers in the same rack for instance. Other implementation of the selection logic could be more impacted.
This patch ensures that the preferred read replica selection is only ran by the leader.
Reviewers: David Jacot <djacot@confluent.io>
Previously, when in KRaft mode, CreateTopics did not return the active configurations for the
topic(s) it had just created. This PR addresses that gap. We will now return these topic
configuration(s) when the user has DESCRIBE_CONFIGS permission. (In the case where the user does
not have this permission, we will omit the configurations and set TopicErrorCode. We will also omit
the number of partitions and replication factor data as well.)
For historical reasons, we use different names to refer to each topic configuration when it is set
in the broker context, as opposed to the topic context. For example, the topic configuration
"segment.ms" corresponds to the broker configuration "log.roll.ms". Additionally, some broker
configurations have synonyms. For example, the broker configuration "log.roll.hours" can be used to
set the log roll time instead of "log.roll.ms". In order to track all of this, this PR adds a
table in LogConfig.scala which maps each topic configuration to an ordered list of ConfigSynonym
classes. (This table is then passed to KafkaConfigSchema as a constructor argument.)
Some synonyms require transformations. For example, in order to convert from "log.roll.hours" to
"segment.ms", we must convert hours to milliseconds. (Note that our assumption right now is that
topic configurations do not have synonyms, only broker configurations. If this changes, we will
need to add some logic to handle it.)
This PR makes the 8-argument constructor for ConfigEntry public. We need this in order to make full
use of ConfigEntry outside of the admin namespace. This change is probably inevitable in general
since otherwise we cannot easily test the output from various admin APIs in junit tests outside the
admin package.
Testing:
This PR adds PlaintextAdminIntegrationTest#testCreateTopicsReturnsConfigs. This test validates
some of the configurations that it gets back from the call to CreateTopics, rather than just checking
if it got back a non-empty map like some of the existing tests. In order to test the
configuration override logic, testCreateDeleteTopics now sets up some custom static and dynamic
configurations.
In QuorumTestHarness, we now allow tests to configure what the ID of the controller should be. This
allows us to set dynamic configurations for the controller in testCreateDeleteTopics. We will have
a more complete fix for setting dynamic configuations on the controller later.
This PR changes ConfigurationControlManager so that it is created via a Builder. This will make it
easier to add more parameters to its constructor without having to update every piece of test code
that uses it. It will also make the test code easier to read.
Reviewers: David Arthur <mumrah@gmail.com>
Add a note in `control.plane.listener.name` doc to mention the value can't be identical with `inter.broker.listener.name`.
Reviewers: Luke Chen <showuon@gmail.com>
Partitions are assigned to fetcher threads based on their hash modulo the number of fetcher threads. When we resize the fetcher thread pool, we basically re-distribute all the partitions based on the new fetcher thread pool size. The issue is that the logic that resizes the fetcher thread pool updates the `fetcherThreadMap` while iterating over it. The `Map` does not give any guarantee in this case - especially when the underlying map is re-hashed - and that led to not iterating over all the fetcher threads during the process and thus in leaving some partitions in the wrong fetcher threads.
Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
In KIP-815 we replaced KafkaConsumer with AdminClient in GetOffsetShell. In the previous implementation, partitions were just ignored if there is no offset for them, however, we will print -1 instead now, This PR fix this inconsistency.
Reviewers: David Jacot <djacot@confluent.io>, Luke Chen <showuon@gmail.com>
With major server components like the new quorum controller being moved outside of the `core` module, it is useful to have shared dependencies moved into `server-common`. An example of this is Yammer metrics which server components still rely heavily upon. All server components should have access to the default registry used by the broker so that new metrics can be registered and metric naming conventions should be standardized. This is particularly important in KRaft where we are attempting to recreate identically named metrics in the controller context. This patch takes a step in this direction. It moves `KafkaYammerMetrics` into `server-common` and it implements
standard metric naming utilities there.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Increase wait in ZooKeeperClientTest.testReinitializeAfterAuthFailure
so that the testcase of https://github.com/apache/kafka/pull/11563
actually fails without the corresponding source code fix.
Followup of https://issues.apache.org/jira/browse/KAFKA-13461.
Co-Authored-By: Gantigmaa Selenge <gantigmaa.selenge1@uk.ibm.com>
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
When a replica selector is configured, the partition leader computes a preferred read replica for any fetch from the consumers. When the preferred read replica is not the leader, the leader returns the preferred read replica with `FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)` to the `ReplicaManager`. This causes the fetch to go into in the fetch purgatory because the exit conditions are not met. In turns out that the delayed fetch is not completed until the timeout is reached because the delayed fetch ignores partition with an unknown offset (-1). If the fetch contains only one partition, the fetch is unnecessarily delayed by the timeout time (500ms by default) to only inform the consumer that it has to read from a follower.
This patch fixes the issue by completing the fetch request immediately when a preferred read replica is defined.
Reviewers: David Jacot <djacot@confluent.io>
This patch adds display names for KRaft and ZK tests. Without this, it becomes hard to understand in Jenkins test reports which test failed. With this addition, it becomes more clear which method in the test suite fails.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
In this test, we have another thread to let broker down and up, to test if consumer can still work as expected. During the broker down and up, we tried to verify the assignment is as what we expected. But the rebalance will keep triggering while broker down and up. It doesn't make sense to verify the assignment here. Remove it to make the test reliable.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Use the standard org.apache.kafka.common.KafkaException instead of kafka.common.KafkaException.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@confluent.io>
Show the LeaderRecoveryState in MetadataShell.
Fix a case where we were comparing a Byte type with an enum type.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Implement auto leader rebalance for KRaft by keeping track of the set of topic partitions which have a leader that is not the preferred replica. If this set is non-empty then schedule a leader balance event for the replica control manager.
When applying PartitionRecords and PartitionChangeRecords to the ReplicationControlManager, if the elected leader is not the preferred replica then remember this topic partition in the set of imbalancedPartitions.
Anytime the quorum controller processes a ControllerWriteEvent it schedules a rebalance operation if the there are no pending rebalance operations, the feature is enabled and there are imbalance partitions.
This KRaft implementation only supports the configurations properties auto.leader.rebalance.enable and leader.imbalance.check.interval.seconds. The configuration property leader.imbalance.per.broker.percentage is not supported and ignored.
Reviewers: Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>
Implementation of the protocol for starting and stopping leader recovery after an unclean leader election. This includes the management of state in the controllers (legacy and KRaft) and propagating this information to the brokers. This change doesn't implement log recovery after an unclean leader election.
Protocol Changes
================
For the topic partition state znode, the new field "leader_recovery_state" was added. If the field is missing the value is assumed to be RECOVERED.
ALTER_PARTITION was renamed from ALTER_ISR. The CurrentIsrVersion field was renamed to PartitionEpoch. The new field LeaderRecoveryState was added.
The new field LeaderRecoverState was added to the LEADER_AND_ISR request. The inter broker protocol version is used to determine which version to send to the brokers.
A new tagged field for LeaderRecoveryState was added to both the PartitionRecord and PartitionChangeRecord.
Controller
==========
For both the KRaft and legacy controller the LeaderRecoveryState is set to RECOVERING, if the leader was elected out of the ISR, also known as unclean leader election. The controller sets the state back to RECOVERED after receiving an ALTER_PARTITION request with version 0, or with version 1 and with the LeaderRecoveryState set to RECOVERED.
Both controllers preserve the leader recovery state even if the unclean leader goes offline and comes back online before an RECOVERED ALTER_PARTITION is sent.
The controllers reply with INVALID_REQUEST if the ALTER_PARTITION either:
1. Attempts to increase the ISR while the partition is still RECOVERING
2. Attempts to change the leader recovery state to RECOVERING from a RECOVERED state.
Topic Partition Leader
======================
The topic partition leader doesn't implement any log recovery in this change. The topic partition leader immediately marks the partition as RECOVERED and sends that state in the next ALTER_PARTITION request.
Reviewers: Jason Gustafson <jason@confluent.io>
It is possible to clean a segment partially if the offset map is filled before reaching the end of the segment. The highest offset that is reached becomes the new dirty offset after the cleaning completes. The data above this offset is nevertheless copied over to the new partially cleaned segment. Hence we need to ensure that the transaction index reflects aborted transactions from both the cleaned and uncleaned portion of the segment. Prior to this patch, this was not the case. We only collected the aborted transactions from the cleaned portion, which means that the reconstructed index could be incomplete. This can cause the aborted data to become effectively committed. It can also cause the deletion of the abort marker before the corresponding data has been removed (i.e. the aborted transaction becomes hanging).
Reviewers: Jun Rao <junrao@gmail.com>
I collected a list of the most flaky tests observed lately, checked / created their corresponding tickets, and mark them as ignored for now. Many of these failures are:
0. Failing very frequently in the past (at least in my observations).
1. not investigated for some time.
2. have a PR for review (mostly thanks to @showuon !), but not reviewed for some time.
Since 0), these tests failures are hindering our development; and from 1/2) above, people are either too busy to look after them, or honestly the tests are not considered as providing values since otherwise people should care enough to panic and try to resolve. So I think it's reasonable to disable all these tests for now. If we later learned our lesson a hard way, it would motivate us to tackle flaky tests more diligently as well.
I'm only disabling those tests that have been failed for a while, and if for such time no one have been looking into them, I'm concerned that just gossiping around about those flakiness would not bring people's attention to them either. So my psychological motivation is that "if people do not care about those failed tests for weeks (which, is not a good thing! :P), let's teach ourselves the lesson a hard way when it indeed buries a bug that bites us, or not learn the lesson at all --- that indicates those tests are indeed not valuable". For tests that I only very recently saw I did not disable them.
Reviewers: John Roesler <vvcephei@apache.org>, Matthias J. Sax <mjsax@apache.org>, Luke Chen <showuon@gmail.com>, Randall Hauch <rhauch@gmail.com>
When a socket is closed, corresponding channel should be retained only if there is complete buffered requests.
Reviewers: David Jacot <djacot@confluent.io>
Create KafkaConfigSchema to encapsulate the concept of determining the types of configuration keys.
This is useful in the controller because we can't import KafkaConfig, which is part of core. Also
introduce the TimelineObject class, which is a more generic version of TimelineInteger /
TimelineLong.
Reviewers: David Arthur <mumrah@gmail.com>
Issue:
Imagine a scenario where two threads T1 and T2 are inside UnifiedLog.flush() concurrently:
KafkaScheduler thread T1 -> The periodic work calls LogManager.flushDirtyLogs() which in turn calls UnifiedLog.flush(). For example, this can happen due to log.flush.scheduler.interval.ms here.
KafkaScheduler thread T2 -> A UnifiedLog.flush() call is triggered asynchronously during segment roll here.
Supposing if thread T1 advances the recovery point beyond the flush offset of thread T2, then this could trip the check within LogSegments.values() here for thread T2, when it is called from LocalLog.flush() here. The exception causes the KafkaScheduler thread to die, which is not desirable.
Fix:
We fix this by ensuring that LocalLog.flush() is immune to the case where the recoveryPoint advances beyond the flush offset.
Reviewers: Jun Rao <junrao@gmail.com>
There seemed to be a little sloppiness in the integration tests in regard to admin client creation. Not only was there duplicated logic, but it wasn't always clear which listener the admin client was targeting. This made it difficult to tell in the context of authorization tests whether we were indeed testing with the right principal. As an example, we had a method in TestUtils which was using the inter-broker listener implicitly. This meant that the test was using the broker principal which had super user privilege. This was intentional, but I think it would be clearer to make the dependence on this listener explicit. This patch attempts to clean this up a bit by consolidating some of the admin creation logic and making the reliance on the listener clearer.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
There are a few integration tests for the forwarding logic which were added prior to kraft being ready for integration testing. Now that we have enabled kraft in integration tests, these tests are redundant and can be removed.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
In `KafkaServer, `ZkConfigRepository` is just a wrapper of `zkClient`, so we don't need to create a new one.
Reviewers: Jason Gustafson <jason@confluent.io>
This patch enables `ApiVersionsTest` to test both kraft brokers and controllers. It fixes a minor bug in which the `Envelope` request to be exposed from `ApiVersions` requests to the kraft broker.
Reviewers: Jason Gustafson <jason@confluent.io>
The current naming of the fields in `ProducerIdsRecord` is a little confusing in regard to whether the block range was inclusive or exclusive. This patch tries to improve naming to make this clearer. In the record class, instead of `ProducerIdsEnd`, we use `NextProducerId`. We have also updated related classes such as `ProducerIdsBlock.java` with similar changes.
Reviewers: dengziming <dengziming1993@gmail.com>, David Arthur <mumrah@gmail.com>
In #11649, we fixed one permission inconsistency between kraft and zk authorization for the `CreatePartitions` request. Previously kraft was requiring `CREATE` permission on the `Topic` resource when it should have required `ALTER`. A second inconsistency is that kraft was also allowing `CREATE` on the `Cluster` resource, which is not supported in zk clusters and was not documented in KIP-195: https://cwiki.apache.org/confluence/display/KAFKA/KIP-195%3A+AdminClient.createPartitions. This patch fixes this inconsistency and adds additional test coverage for both cases.
Reviewers: José Armando García Sancio <jsancio@gmail.com>
This PR follows #11629 to enable `CreateTopicsRequestWithForwardingTest` and `CreateTopicsRequestWithPolicyTest` in KRaft mode.
Reviewers: Jason Gustafson <jason@confluent.io>
This patch adds missing `equals` and `hashCode` implements for `RawMetaProperties`. This is relied on by the storage tool for detecting when two log directories have different `meta.properties` files.
Reproduce current issue:
```shell
$ sed -i 's|log.dirs=/tmp/kraft-combined-logs|+log.dirs=/tmp/kraft-combined-logs,/tmp/kraft-combined-logs2' ./config/kraft/server.properties
$ ./bin/kafka-storage.sh format -t R19xNyxMQvqQRGlkGDi2cg -c ./config/kraft/server.properties
Formatting /tmp/kraft-combined-logs
Formatting /tmp/kraft-combined-logs2
$ ./bin/kafka-storage.sh info -c ./config/kraft/server.properties
Found log directories:
/tmp/kraft-combined-logs
/tmp/kraft-combined-logs2
Found metadata: {cluster.id=R19xNyxMQvqQRGlkGDi2cg, node.id=1, version=1}
Found problem:
Metadata for /tmp/kraft-combined-logs2/meta.properties was {cluster.id=R19xNyxMQvqQRGlkGDi2cg, node.id=1, version=1}, but other directories featured {cluster.id=R19xNyxMQvqQRGlkGDi2cg, node.id=1, version=1}
```
It's reporting that same metadata are not the same...
With this fix:
```shell
$ ./bin/kafka-storage.sh info -c ./config/kraft/server.properties
Found log directories:
/tmp/kraft-combined-logs
/tmp/kraft-combined-logs2
Found metadata: {cluster.id=R19xNyxMQvqQRGlkGDi2cg, node.id=1, version=1}
```
Reviewers: Igor Soarez <soarez@apple.com>, Jason Gustafson <jason@confluent.io>
This patch ensures that the committed offsets are not expired while the group is rebalancing. The issue is that we can't rely on the subscribed topics if the group is not stable.
Reviewers: David Jacot <djacot@confluent.io>
Currently, when using KRaft mode, users still have to have an Apache ZooKeeper instance if they want to use AclAuthorizer. We should have a built-in Authorizer for KRaft mode that does not depend on ZooKeeper. This PR introduces such an authorizer, called StandardAuthorizer. See KIP-801 for a full description of the new Authorizer design.
Authorizer.java: add aclCount API as described in KIP-801. StandardAuthorizer is currently the only authorizer that implements it, but eventually we may implement it for AclAuthorizer and others as well.
ControllerApis.scala: fix a bug where createPartitions was authorized using CREATE on the topic resource rather than ALTER on the topic resource as it should have been.
QuorumTestHarness: rename the controller endpoint to CONTROLLER for consistency (the brokers already called it that). This is relevant in AuthorizerIntegrationTest where we are examining endpoint names. Also add the controllerServers call.
TestUtils.scala: adapt the ACL functions to be usable from KRaft, by ensuring that they use the Authorizer from the current active controller.
BrokerMetadataPublisher.scala: add broker-side ACL application logic.
Controller.java: add ACL APIs. Also add a findAllTopicIds API in order to make junit tests that use KafkaServerTestHarness#getTopicNames and KafkaServerTestHarness#getTopicIds work smoothly.
AuthorizerIntegrationTest.scala: convert over testAuthorizationWithTopicExisting (more to come soon)
QuorumController.java: add logic for replaying ACL-based records. This means storing them in the new AclControlManager object, and integrating them into controller snapshots. It also means applying the changes in the Authorizer, if one is configured. In renounce, when reverting to a snapshot, also set newBytesSinceLastSnapshot to 0.
Reviewers: YeonCheol Jang <YeonCheolGit@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
Title: KafkaConsumer cannot jump out of the poll method, and cpu and traffic on the broker side increase sharply
description: The local test has been passed, the problem described by jira can be solved
JIRA link : https://issues.apache.org/jira/browse/KAFKA-13310
Reviewers: Luke Chen <showuon@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
After KAFKA-10793, we clear the findCoordinatorFuture in 2 places:
1. heartbeat thread
2. AbstractCoordinator#ensureCoordinatorReady
But in non consumer group mode with group id provided (for offset commitment. So that there will be consumerCoordinator created), there will be no (1)heartbeat thread , and it only call (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to fetch committed offset position. That is, after 2nd lookupCoordinator call, we have no chance to clear the findCoordinatorFuture , and causes the offset commit never succeeded.
To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear the findCoordinatorFuture in the future listener. So, I think we can fix this issue by calling AbstractCoordinator#ensureCoordinatorReady when coordinator unknown in non consumer group case, under each ConsumerCoordinator#poll.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
We allowed `maxProducerIdExpirationMs` and `time` to be optional in the `ProducerStateManager` constructor. We generally frown on optional arguments since it is too easy to overlook them. In this case, it was especially dangerous because the recently added `maxTransactionTimeoutMs` argument used the same type as `maxProducerIdExpirationMs`.
Reviewers: David Jacot <david.jacot@gmail.com, Ismael Juma <ismael@juma.me.uk>
In v3.0, we changed the default value for `enable.idempotence` to true, but we didn't adjust the validator and the `idempotence` enabled check method. So if a user didn't explicitly enable idempotence, this feature won't be turned on. This patch addresses the problem, cleans up associated logic, and fixes tests that broke as a result of properly applying the new default. Specifically it does the following:
1. fix the `ProducerConfig#idempotenceEnabled` method, to make it correctly detect if `idempotence` is enabled or not
2. remove some unnecessary config overridden and checks due to we already default `acks`, `retries` and `enable.idempotence` configs.
3. move the config validator for the idempotent producer from `KafkaProducer` into `ProducerConfig`. The config validation should be the responsibility of `ProducerConfig` class.
4. add an `AbstractConfig#hasKeyInOriginals` method, to avoid `originals` configs get copied and only want to check the existence of the key.
5. fix many broken tests. As mentioned, we didn't actually enable idempotence in v3.0. After this PR, there are some tests broken due to some different behavior between idempotent and non-idempotent producer.
6. add additional tests to validate configuration behavior
Reviewers: Kirk True <kirk@mustardgrain.com>, Ismael Juma <ismael@juma.me.uk>, Mickael Maison <mimaison@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
This PR updates the documentation and tooling to match https://github.com/apache/kafka/pull/10914, which added support for encoding `deleteHorizonMs` in the record batch schema. The changes include adding the new attribute and updating field names. We have also updated stale references to the old `FirstTimestamp` field in the code and comments. Finally, In the `DumpLogSegments` tool, when record batch information is printed, it will also include the value of `deleteHorizonMs` is (e.g. `OptionalLong.empty` or `OptionalLong[123456]`).
Reviewers: Vincent Jiang <84371940+vincent81jiang@users.noreply.github.com>, Kvicii <42023367+Kvicii@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
The current error message suggests that controller.listener.names is a replacement for
control.plane.listener.name. This is incorrect since these configurations have very different
functions. This PR deletes the incorrect message.
Reviewers: David Jacot <david.jacot@gmail.com>, Kvicii