Use the standard org.apache.kafka.common.KafkaException instead of kafka.common.KafkaException.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@confluent.io>
Show the LeaderRecoveryState in MetadataShell.
Fix a case where we were comparing a Byte type with an enum type.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Implement auto leader rebalance for KRaft by keeping track of the set of topic partitions which have a leader that is not the preferred replica. If this set is non-empty then schedule a leader balance event for the replica control manager.
When applying PartitionRecords and PartitionChangeRecords to the ReplicationControlManager, if the elected leader is not the preferred replica then remember this topic partition in the set of imbalancedPartitions.
Anytime the quorum controller processes a ControllerWriteEvent it schedules a rebalance operation if the there are no pending rebalance operations, the feature is enabled and there are imbalance partitions.
This KRaft implementation only supports the configurations properties auto.leader.rebalance.enable and leader.imbalance.check.interval.seconds. The configuration property leader.imbalance.per.broker.percentage is not supported and ignored.
Reviewers: Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>
Implementation of the protocol for starting and stopping leader recovery after an unclean leader election. This includes the management of state in the controllers (legacy and KRaft) and propagating this information to the brokers. This change doesn't implement log recovery after an unclean leader election.
Protocol Changes
================
For the topic partition state znode, the new field "leader_recovery_state" was added. If the field is missing the value is assumed to be RECOVERED.
ALTER_PARTITION was renamed from ALTER_ISR. The CurrentIsrVersion field was renamed to PartitionEpoch. The new field LeaderRecoveryState was added.
The new field LeaderRecoverState was added to the LEADER_AND_ISR request. The inter broker protocol version is used to determine which version to send to the brokers.
A new tagged field for LeaderRecoveryState was added to both the PartitionRecord and PartitionChangeRecord.
Controller
==========
For both the KRaft and legacy controller the LeaderRecoveryState is set to RECOVERING, if the leader was elected out of the ISR, also known as unclean leader election. The controller sets the state back to RECOVERED after receiving an ALTER_PARTITION request with version 0, or with version 1 and with the LeaderRecoveryState set to RECOVERED.
Both controllers preserve the leader recovery state even if the unclean leader goes offline and comes back online before an RECOVERED ALTER_PARTITION is sent.
The controllers reply with INVALID_REQUEST if the ALTER_PARTITION either:
1. Attempts to increase the ISR while the partition is still RECOVERING
2. Attempts to change the leader recovery state to RECOVERING from a RECOVERED state.
Topic Partition Leader
======================
The topic partition leader doesn't implement any log recovery in this change. The topic partition leader immediately marks the partition as RECOVERED and sends that state in the next ALTER_PARTITION request.
Reviewers: Jason Gustafson <jason@confluent.io>
It is possible to clean a segment partially if the offset map is filled before reaching the end of the segment. The highest offset that is reached becomes the new dirty offset after the cleaning completes. The data above this offset is nevertheless copied over to the new partially cleaned segment. Hence we need to ensure that the transaction index reflects aborted transactions from both the cleaned and uncleaned portion of the segment. Prior to this patch, this was not the case. We only collected the aborted transactions from the cleaned portion, which means that the reconstructed index could be incomplete. This can cause the aborted data to become effectively committed. It can also cause the deletion of the abort marker before the corresponding data has been removed (i.e. the aborted transaction becomes hanging).
Reviewers: Jun Rao <junrao@gmail.com>
I collected a list of the most flaky tests observed lately, checked / created their corresponding tickets, and mark them as ignored for now. Many of these failures are:
0. Failing very frequently in the past (at least in my observations).
1. not investigated for some time.
2. have a PR for review (mostly thanks to @showuon !), but not reviewed for some time.
Since 0), these tests failures are hindering our development; and from 1/2) above, people are either too busy to look after them, or honestly the tests are not considered as providing values since otherwise people should care enough to panic and try to resolve. So I think it's reasonable to disable all these tests for now. If we later learned our lesson a hard way, it would motivate us to tackle flaky tests more diligently as well.
I'm only disabling those tests that have been failed for a while, and if for such time no one have been looking into them, I'm concerned that just gossiping around about those flakiness would not bring people's attention to them either. So my psychological motivation is that "if people do not care about those failed tests for weeks (which, is not a good thing! :P), let's teach ourselves the lesson a hard way when it indeed buries a bug that bites us, or not learn the lesson at all --- that indicates those tests are indeed not valuable". For tests that I only very recently saw I did not disable them.
Reviewers: John Roesler <vvcephei@apache.org>, Matthias J. Sax <mjsax@apache.org>, Luke Chen <showuon@gmail.com>, Randall Hauch <rhauch@gmail.com>
When a socket is closed, corresponding channel should be retained only if there is complete buffered requests.
Reviewers: David Jacot <djacot@confluent.io>
Create KafkaConfigSchema to encapsulate the concept of determining the types of configuration keys.
This is useful in the controller because we can't import KafkaConfig, which is part of core. Also
introduce the TimelineObject class, which is a more generic version of TimelineInteger /
TimelineLong.
Reviewers: David Arthur <mumrah@gmail.com>
Issue:
Imagine a scenario where two threads T1 and T2 are inside UnifiedLog.flush() concurrently:
KafkaScheduler thread T1 -> The periodic work calls LogManager.flushDirtyLogs() which in turn calls UnifiedLog.flush(). For example, this can happen due to log.flush.scheduler.interval.ms here.
KafkaScheduler thread T2 -> A UnifiedLog.flush() call is triggered asynchronously during segment roll here.
Supposing if thread T1 advances the recovery point beyond the flush offset of thread T2, then this could trip the check within LogSegments.values() here for thread T2, when it is called from LocalLog.flush() here. The exception causes the KafkaScheduler thread to die, which is not desirable.
Fix:
We fix this by ensuring that LocalLog.flush() is immune to the case where the recoveryPoint advances beyond the flush offset.
Reviewers: Jun Rao <junrao@gmail.com>
There seemed to be a little sloppiness in the integration tests in regard to admin client creation. Not only was there duplicated logic, but it wasn't always clear which listener the admin client was targeting. This made it difficult to tell in the context of authorization tests whether we were indeed testing with the right principal. As an example, we had a method in TestUtils which was using the inter-broker listener implicitly. This meant that the test was using the broker principal which had super user privilege. This was intentional, but I think it would be clearer to make the dependence on this listener explicit. This patch attempts to clean this up a bit by consolidating some of the admin creation logic and making the reliance on the listener clearer.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
There are a few integration tests for the forwarding logic which were added prior to kraft being ready for integration testing. Now that we have enabled kraft in integration tests, these tests are redundant and can be removed.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
In `KafkaServer, `ZkConfigRepository` is just a wrapper of `zkClient`, so we don't need to create a new one.
Reviewers: Jason Gustafson <jason@confluent.io>
This patch enables `ApiVersionsTest` to test both kraft brokers and controllers. It fixes a minor bug in which the `Envelope` request to be exposed from `ApiVersions` requests to the kraft broker.
Reviewers: Jason Gustafson <jason@confluent.io>
The current naming of the fields in `ProducerIdsRecord` is a little confusing in regard to whether the block range was inclusive or exclusive. This patch tries to improve naming to make this clearer. In the record class, instead of `ProducerIdsEnd`, we use `NextProducerId`. We have also updated related classes such as `ProducerIdsBlock.java` with similar changes.
Reviewers: dengziming <dengziming1993@gmail.com>, David Arthur <mumrah@gmail.com>
In #11649, we fixed one permission inconsistency between kraft and zk authorization for the `CreatePartitions` request. Previously kraft was requiring `CREATE` permission on the `Topic` resource when it should have required `ALTER`. A second inconsistency is that kraft was also allowing `CREATE` on the `Cluster` resource, which is not supported in zk clusters and was not documented in KIP-195: https://cwiki.apache.org/confluence/display/KAFKA/KIP-195%3A+AdminClient.createPartitions. This patch fixes this inconsistency and adds additional test coverage for both cases.
Reviewers: José Armando García Sancio <jsancio@gmail.com>
This PR follows #11629 to enable `CreateTopicsRequestWithForwardingTest` and `CreateTopicsRequestWithPolicyTest` in KRaft mode.
Reviewers: Jason Gustafson <jason@confluent.io>
This patch adds missing `equals` and `hashCode` implements for `RawMetaProperties`. This is relied on by the storage tool for detecting when two log directories have different `meta.properties` files.
Reproduce current issue:
```shell
$ sed -i 's|log.dirs=/tmp/kraft-combined-logs|+log.dirs=/tmp/kraft-combined-logs,/tmp/kraft-combined-logs2' ./config/kraft/server.properties
$ ./bin/kafka-storage.sh format -t R19xNyxMQvqQRGlkGDi2cg -c ./config/kraft/server.properties
Formatting /tmp/kraft-combined-logs
Formatting /tmp/kraft-combined-logs2
$ ./bin/kafka-storage.sh info -c ./config/kraft/server.properties
Found log directories:
/tmp/kraft-combined-logs
/tmp/kraft-combined-logs2
Found metadata: {cluster.id=R19xNyxMQvqQRGlkGDi2cg, node.id=1, version=1}
Found problem:
Metadata for /tmp/kraft-combined-logs2/meta.properties was {cluster.id=R19xNyxMQvqQRGlkGDi2cg, node.id=1, version=1}, but other directories featured {cluster.id=R19xNyxMQvqQRGlkGDi2cg, node.id=1, version=1}
```
It's reporting that same metadata are not the same...
With this fix:
```shell
$ ./bin/kafka-storage.sh info -c ./config/kraft/server.properties
Found log directories:
/tmp/kraft-combined-logs
/tmp/kraft-combined-logs2
Found metadata: {cluster.id=R19xNyxMQvqQRGlkGDi2cg, node.id=1, version=1}
```
Reviewers: Igor Soarez <soarez@apple.com>, Jason Gustafson <jason@confluent.io>
This patch ensures that the committed offsets are not expired while the group is rebalancing. The issue is that we can't rely on the subscribed topics if the group is not stable.
Reviewers: David Jacot <djacot@confluent.io>
Currently, when using KRaft mode, users still have to have an Apache ZooKeeper instance if they want to use AclAuthorizer. We should have a built-in Authorizer for KRaft mode that does not depend on ZooKeeper. This PR introduces such an authorizer, called StandardAuthorizer. See KIP-801 for a full description of the new Authorizer design.
Authorizer.java: add aclCount API as described in KIP-801. StandardAuthorizer is currently the only authorizer that implements it, but eventually we may implement it for AclAuthorizer and others as well.
ControllerApis.scala: fix a bug where createPartitions was authorized using CREATE on the topic resource rather than ALTER on the topic resource as it should have been.
QuorumTestHarness: rename the controller endpoint to CONTROLLER for consistency (the brokers already called it that). This is relevant in AuthorizerIntegrationTest where we are examining endpoint names. Also add the controllerServers call.
TestUtils.scala: adapt the ACL functions to be usable from KRaft, by ensuring that they use the Authorizer from the current active controller.
BrokerMetadataPublisher.scala: add broker-side ACL application logic.
Controller.java: add ACL APIs. Also add a findAllTopicIds API in order to make junit tests that use KafkaServerTestHarness#getTopicNames and KafkaServerTestHarness#getTopicIds work smoothly.
AuthorizerIntegrationTest.scala: convert over testAuthorizationWithTopicExisting (more to come soon)
QuorumController.java: add logic for replaying ACL-based records. This means storing them in the new AclControlManager object, and integrating them into controller snapshots. It also means applying the changes in the Authorizer, if one is configured. In renounce, when reverting to a snapshot, also set newBytesSinceLastSnapshot to 0.
Reviewers: YeonCheol Jang <YeonCheolGit@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
Title: KafkaConsumer cannot jump out of the poll method, and cpu and traffic on the broker side increase sharply
description: The local test has been passed, the problem described by jira can be solved
JIRA link : https://issues.apache.org/jira/browse/KAFKA-13310
Reviewers: Luke Chen <showuon@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
After KAFKA-10793, we clear the findCoordinatorFuture in 2 places:
1. heartbeat thread
2. AbstractCoordinator#ensureCoordinatorReady
But in non consumer group mode with group id provided (for offset commitment. So that there will be consumerCoordinator created), there will be no (1)heartbeat thread , and it only call (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to fetch committed offset position. That is, after 2nd lookupCoordinator call, we have no chance to clear the findCoordinatorFuture , and causes the offset commit never succeeded.
To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear the findCoordinatorFuture in the future listener. So, I think we can fix this issue by calling AbstractCoordinator#ensureCoordinatorReady when coordinator unknown in non consumer group case, under each ConsumerCoordinator#poll.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
We allowed `maxProducerIdExpirationMs` and `time` to be optional in the `ProducerStateManager` constructor. We generally frown on optional arguments since it is too easy to overlook them. In this case, it was especially dangerous because the recently added `maxTransactionTimeoutMs` argument used the same type as `maxProducerIdExpirationMs`.
Reviewers: David Jacot <david.jacot@gmail.com, Ismael Juma <ismael@juma.me.uk>
In v3.0, we changed the default value for `enable.idempotence` to true, but we didn't adjust the validator and the `idempotence` enabled check method. So if a user didn't explicitly enable idempotence, this feature won't be turned on. This patch addresses the problem, cleans up associated logic, and fixes tests that broke as a result of properly applying the new default. Specifically it does the following:
1. fix the `ProducerConfig#idempotenceEnabled` method, to make it correctly detect if `idempotence` is enabled or not
2. remove some unnecessary config overridden and checks due to we already default `acks`, `retries` and `enable.idempotence` configs.
3. move the config validator for the idempotent producer from `KafkaProducer` into `ProducerConfig`. The config validation should be the responsibility of `ProducerConfig` class.
4. add an `AbstractConfig#hasKeyInOriginals` method, to avoid `originals` configs get copied and only want to check the existence of the key.
5. fix many broken tests. As mentioned, we didn't actually enable idempotence in v3.0. After this PR, there are some tests broken due to some different behavior between idempotent and non-idempotent producer.
6. add additional tests to validate configuration behavior
Reviewers: Kirk True <kirk@mustardgrain.com>, Ismael Juma <ismael@juma.me.uk>, Mickael Maison <mimaison@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
This PR updates the documentation and tooling to match https://github.com/apache/kafka/pull/10914, which added support for encoding `deleteHorizonMs` in the record batch schema. The changes include adding the new attribute and updating field names. We have also updated stale references to the old `FirstTimestamp` field in the code and comments. Finally, In the `DumpLogSegments` tool, when record batch information is printed, it will also include the value of `deleteHorizonMs` is (e.g. `OptionalLong.empty` or `OptionalLong[123456]`).
Reviewers: Vincent Jiang <84371940+vincent81jiang@users.noreply.github.com>, Kvicii <42023367+Kvicii@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
The current error message suggests that controller.listener.names is a replacement for
control.plane.listener.name. This is incorrect since these configurations have very different
functions. This PR deletes the incorrect message.
Reviewers: David Jacot <david.jacot@gmail.com>, Kvicii
`zookeeper.sync.time.ms` was previously used with the old Scala consumer, which
was removed in Apache Kafka 2.0.0. Remove the config definition from `KafkaConfig`
and documentation.
Reviewers: Luke Chen <showuon@gmail.com>, Ismael Juma <ismael@juma.me.uk>
With ddb6959c62, `Consumer::poll` will return an empty record batch when position advances due to aborted transactions or control records. This makes the `ConsoleConsumer` exists because it assumes that `poll` returns due to the timeout being reached. This patch fixes this by explicitly tracking the timeout.
Reviewers: Jason Gustafson <jason@confluent.io>
Within a LogSegment, the TimeIndex and OffsetIndex are lazy indices that don't get created on disk until they are accessed for the first time. However, Log recovery logic expects the presence of an offset index file on disk for each segment, otherwise, the segment is considered corrupted.
This PR introduces a forceFlushActiveSegment boolean for the log.flush function to allow the shutdown process to flush the empty active segment, which makes sure the offset index file exists.
Co-Author: Kowshik Prakasam kowshik@gmail.com
Reviewers: Jason Gustafson <jason@confluent.io>, Jun Rao <junrao@gmail.com>
In the fetch path, we check shouldLeaderThrottle regardless of whether the read is coming from a consumer or follower broker. This results in replication quota being applied to consumer fetches. This patch ensures that it is only applied to followers.
Reviewers: David Jacot <djacot@confluent.io>
The logic for log loading is encapsulated in `LogLoader`. Currently all the methods are static and we pass the parameters through a separate object `LogLoaderParams`. This patch simplifies this structure by turning `LogLoader` into a normal object and get rid of `LogLoaderParams`.
Reviewers: David Jacot <djacot@confluent.io>
Augments existing shutdown tests for KRaft. Adds the ability to update configs in KRaft tests,
and in both the ZK and KRaft cases to be able to update configs without losing the server's
log directory and data.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
The issue was quite subtile. It was due to a race for the `partitionMapLock` lock. `assertFetcherHasTopicId` would only succeed if it can acquire it before `processFetchRequest`. This PR refactors the test in order to make it more stable.
Reviewers: Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>,
Currently, KRaft does not support setting BROKER_LOGGER configs (it always fails.) Additionally,
there are several bugs in the handling of BROKER configs. They are not properly validated on the
forwarding broker, and the way we apply them is buggy as well. This PR fixes those issues.
KafkaApis: add support for doing validation and log4j processing on the forwarding broker. This
involves breaking the config request apart and forwarding only part of it. Adjust KafkaApisTest to
test the new behavior, rather than expecting forwarding of the full request.
MetadataSupport: remove MetadataSupport#controllerId since it duplicates the functionality of
MetadataCache#controllerId. Add support for getResourceConfig and maybeForward.
ControllerApis: log an error message if the handler throws an exception, just like we do in
KafkaApis.
ControllerConfigurationValidator: add JavaDoc.
Move some functions that don't involve ZK from ZkAdminManager to DynamicConfigManager. Move some
validation out of ZkAdminManager and into a new class, ConfigAdminManager, which is not tied to ZK.
ForwardingManager: add support for sending new requests, rather than just forwarding existing
requests.
BrokerMetadataPublisher: do not try to apply dynamic configurations for brokers other than the
current one. Log an INFO message when applying a new dynamic config, like we do in ZK mode. Also,
invoke reloadUpdatedFilesWithoutConfigChange when applying a new non-default BROKER config.
QuorumController: fix a bug in ConfigResourceExistenceChecker which prevented cluster configs from
being set. Add a test for this class.
Reviews: José Armando García Sancio <jsancio@users.noreply.github.com>
The issue is that when `zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic)` is called after the new controller is brought up, there is not guarantee that the controller has already written the topic id to the topic znode.
Reviewers: Jason Gustafson <jason@confluent.io>
If the user's `initTransactions` call times out, the user is expected to retry. However, the producer will continue retrying the `InitProducerId` request in the background. If it happens to return before the user retry of `initTransactions`, then the producer will raise an exception about an invalid state transition.
The patch fixes the issue by tracking the pending state transition until the user has acknowledged the operation's result. In the case of `initTransactions`, even if the `InitProducerId` returns in the background and the state changes, we can still retry the `initTransactions` call to obtain the result.
Reviewers: David Jacot <djacot@confluent.io>
Implements KIP-788. The number of network threads can be set per listener using the following syntax:
listener.name.<listener>.num.network.threads=<num>
Reviewers: Tom Bentley <tbentley@redhat.com>, Andrew Eugene Choi <andrew.choi@uwaterloo.ca>, David Jacot <djacot@confluent.io>
The KRaft controller should validate that the clusterID matches before allowing a broker to register in
the cluster.
Reviewers: José Armando García Sancio <jsancio@gmail.com>
Updated: This PR will reset generation ID when ILLEGAL_GENERATION error since the member ID is still valid.
=====
resetStateAndRejoin when REBALANCE_IN_PROGRESS error in sync group, to avoid out-of-date ownedPartition
== JIRA description ==
In KAFKA-13406, we found there's user got stuck when in rebalancing with cooperative sticky assignor. The reason is the "ownedPartition" is out-of-date, and it failed the cooperative assignment validation.
Investigate deeper, I found the root cause is we didn't reset generation and state after sync group fail. In KAFKA-12983, we fixed the issue that the onJoinPrepare is not called in resetStateAndRejoin method. And it causes the ownedPartition not get cleared. But there's another case that the ownedPartition will be out-of-date. Here's the example:
consumer A joined and synced group successfully with generation 1
New rebalance started with generation 2, consumer A joined successfully, but somehow, consumer A doesn't send out sync group immediately
other consumer completed sync group successfully in generation 2, except consumer A.
After consumer A send out sync group, the new rebalance start, with generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group response
When receiving REBALANCE_IN_PROGRESS, we re-join the group, with generation 3, with the assignment (ownedPartition) in generation 1.
So, now, we have out-of-date ownedPartition sent, with unexpected results happened
We might want to do resetStateAndRejoin when RebalanceInProgressException errors happend in sync group. Because when we got sync group error, it means, join group passed, and other consumers (and the leader) might already completed this round of rebalance. The assignment distribution this consumer have is already out-of-date.
Reviewers: David Jacot <djacot@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Allow the leader epoch to be re-assigned to the new value from the Metadata response if `oldTopicId` is not present in the cache. This is needed because `oldTopicId` is removed from the cache if the topic gets deleted but the leader epoch is not removed. Hence, metadata for the newly recreated topic won't be accepted unless we allow `oldTopicId` to be null.
Reviewers: Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>
This patch tightens the configuration checks related to KRaft configs by adding the following constraints:
* `control.plane.listener.name` is confirmed to be empty in KRaft mode whenever a config object is created as opposed to later when the broker is given the config and tries to start.
* `controller.listener.names` is required to be empty for the non-KRaft (i.e. ZooKeeper) case. A ZooKeeper-based cluster that sets this config will fail to restart until this config is removed.
* There must be no advertised listeners when running just a KRaft controller (i.e. when `process.roles=controller`). This means neither `listeners` nor `advertised.listeners` (if the latter is explicitly defined) can contain a listener that does not also appear in `controller.listener.names`.
* When running a KRaft broker (i.e. when `process.roles=broker` or `process.roles=broker,controller`), advertised listeners (which was already checked to be non-empty via the check that the inter-broker listener appear there) must not include any listeners appearing in `controller.listener.names`.
* When running a KRaft controller (i.e. when `process.roles=controller` or `process.roles=broker,controller`) `controller.listener.names` must be non-empty and every one must appear in `listeners`
* When running just a KRaft broker (i.e. when `process.roles=broker`) `controller.listener.names` must be non-empty and none of them can appear in `listeners`. This was indirectly checked previously, but the indirect checks did not catch all cases.
* When running just a KRaft broker we log a warning if more than one entry appears in `controller.listener.names` because only the first entry is used.
* We also map configured controller listener names to the `PLAINTEXT` security protocol by default provided that the security mapping is empty and no other security protocols are in use.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
Require that topics exist before topic configurations can be created for them.
Merge the code from ConfigurationControlManager#checkConfigResource into
ControllerConfigurationValidator to avoid duplication.
Add KRaft support to DynamicConfigChangeTest.
Split out tests in DynamicConfigChangeTest that don't require a cluster into
DynamicConfigChangeUnitTest to save test time.
Reviewers: David Arthur <mumrah@gmail.com>
Kafka has duplicate configuration information log information printing during startup, repeated information printing will bring confusion to users.It is better to add log information before and after repeating the configuration information.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
For CreateTopics, fix a bug where if one createTopics in a batch failed, they would all fail with
the same error code. Make the error message for TOPIC_ALREADY_EXISTS consistent with the ZK-based
code by including the topic name.
For IncrementalAlterConfigs, before we allow topic configurations to be set, we should check that
they are valid. (This also applies to newly created topics.) IncrementalAlterConfigs should ignore
non-null payloads for DELETE operations. Previously we would return an error in these cases.
However, this is not compatible with the old ZK-based code, which ignores the payload in these
cases.
Reviewers: José Armando García Sancio <jsancio@gmail.com>, Jason Gustafson <jason@confluent.io>
If JAAS configuration does not contain a Client section for ZK clients, an auth failure event is generated. If this occurs after the connection is setup in the controller, we schedule reinitialize(), which causes controller to resign. In the case where SASL is not mandatory and the connection is alive, controller maintains the current session and doesn't register its watchers, leaving it in a bad state.
Reviewers: Jun Rao <junrao@gmail.com>
Change the snapshot API so that SnapshotWriter and SnapshotReader are interfaces. Change the existing types SnapshotWriter and SnapshotReader to use a different name and to implement the interfaces introduced by this commit.
Co-authored-by: loboxu <loboxu@tencent.com>
Reviews: José Armando García Sancio <jsancio@users.noreply.github.com>
We now use 2MB as with the other test harnesses.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Colin Patrick McCabe <cmccabe@confluent.io>, Luke Chen <showuon@gmail.com>
Author: Colin P. Mccabe <cmccabe@confluent.io>
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Sherzod Mamadaliev <mamadaliev@yahoo.com>
Closes#11457 from cmccabe/guard_against_exit
When creating snapshots, controllers generate a ProducerIdsRecord indicating the highest producer ID
that has been used so far. Brokers should generate the same record, so that the snapshots can be
compared.
Also, fix a bug in MetadataDelta#finishSnapshot. The current logic will produce the wrong result if
all objects of a certain type are completely removed in the snapshot. The fix is to unconditionally
create each delta object.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
This patch ensures that SocketChannel in Acceptor#accept is closed if an IOException is thrown while the socket is configured.
Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
Leader election and resignation logic for the Group Coordinator and Transaction Coordinator is the
same. Share this logic by refactoring this code into a method.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
The KRaft brokers should not list the names in `controller.listener.names` in `listeners` because brokers do not bind to those endpoints. This commit also removes the extra changes to the security protocol map because the `PLAINTEXT` protocol doesn't require additional configuration.
To fully support all of the security protocol configuration additional changes to `QuorumTestHarness` are needed. Those changes can be made when migrating integration tests that need this functionality.
Reviewers: Ron Dagostino <rdagostino@confluent.io>, Jason Gustafson <jason@confluent.io>
With KAFKA-13102, we added topic IDs to the InitialFetchState and the PartitionFetchState in order to send fetch requests using topic IDs when IBP is 3.1.
However, there are some cases where we could initially send topic IDs from the controller and then no longer to do so (controller changes to an IBP < 2.8). If we do not remove from the PartitionFetchState and one broker is still IBP 3.1, it will try to send a version 13 fetch request to brokers that no longer have topic IDs in the metadata cache. This could leave the cluster in a state unable to fetch from these partitions.
This patch removes the topic IDs from the PartitionFetchState if the log contains a topic ID but the request does not. This means that we will always handle a leader and isr request if there is no ID in the request but an ID in the log.
Such a state should be transient because we are either
* upgrading the cluster and somehow switched between a new IBP controller and an old one --> and will eventually have all new IBP controllers/brokers.
* downgrading the cluster --> will eventually have all old IBP controllers/brokers and will restart the broker/delete the partition metadata file for them.
Reviewers: David Jacot <djacot@confluent.io>
The sasl.oauthbearer.jwks.endpoint.retry.backoff.ms and sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms configuration options were added to the SaslConfig class but their default values were not added to KafkaConfig. As a result, when the OAuth validation feature is enabled in the broker and those two configuration values aren't explicitly provided by the user, the broker exits. This patch fixes the issue by defining them in the KafkaConfig class.
Reviewers: David Jacot <djacot@confluent.io>
With the changes for topic IDs, we have a different flow. When a broker receives a request, it uses a map to convert the topic ID to topic names. If the topic ID is not found in the map, we return a top level error and close the session. This decision was motivated by the difficulty to store “unresolved” partitions in the session. In earlier iterations we stored an “unresolved” partition object in the cache, but it was somewhat hard to reason about and required extra logic to try to resolve the topic ID on each incremental request and add to the session. It also required extra logic to forget the topic (either by topic ID if the topic name was never known or by topic name if it was finally resolved when we wanted to remove from the session.)
One helpful simplifying factor is that we only allow one type of request (uses topic ID or does not use topic ID) in the session. That means we can rely on a session continuing to have the same information. We don’t have to worry about converting topics only known by name to topic ID for a response and we won’t need to convert topics only known by ID to name for a response.
This PR introduces a change to store the "unresolved partitions" in the cached partition object. If a version 13+ request is sent with a topic ID that is unknown, a cached partition will be created with that fetch request data and a null topic name. On subsequent incremental requests, unresolved partitions may be resolved with the new IDs found in the metadata cache. When handling the request, getting all partitions will return a TopicIdPartition object that will be used to handle the request and build the response. Since we can rely on only one type of request (with IDs or without), the cached partitions map will have different keys depending on what fetch request version is being used.
This PR involves changes both in FetchSessionHandler and FetchSession. Some major changes are outlined below.
1. FetchSessionHandler: Forgetting a topic and adding a new topic with the same name - We may have a case where there is a topic foo with ID 1 in the session. Upon a subsequent metadata update, we may have topic foo with ID 2. This means that topic foo has been deleted and recreated. When sending fetch requests version 13+ we will send a request to add foo ID 2 to the session and remove foo ID 1. Otherwise, we will fall back to the same behavior for versions 12 and below
2. FetchSession: Resolving in Incremental Sessions - Incremental sessions contain two distinct sets of partitions. Partitions that are sent in the latest request that are new/updates/forgotten partitions and the partitions already in the session. If we want to resolve unknown topic IDs we will need to handle both cases.
* Partitions in the request - These partitions are either new or updating/forgetting previous partitions in the session. The new partitions are trivial. We either have a resolved partition or create a partition that is unresolved. For the other cases, we need to be a bit more careful.
* For updated partitions we have a few cases – keep in mind, we may not programmatically know if a partition is an update:
1. partition in session is resolved, update is resolved: trivial
2. partition in session is unresolved, update is unresolved: in code, this is equivalent to the case above, so trivial as well
3. partition in session is unresolved, update is resolved: this means the partition in the session does not have a name, but the metadata cache now contains the name – to fix this we can check if there exists a cached partition with the given ID and update it both with the partition update and with the topic name.
4. partition in session is resolved, update is unresolved: this means the partition in the session has a name, but the update was unable to be resolved (ie, the topic is deleted) – this is the odd case. We will look up the partition using the ID. We will find the old version with a name but will not replace the name. This will lead to an UNKNOWN_TOPIC_OR_PARTITION or INCONSISTENT_TOPIC_ID error which will be handled with a metadata update. Likely a future request will forget the partition, and we will be able to do so by ID.
5. Two partitions in the session have IDs, but they are different: only one topic ID should exist in the metadata at a time, so likely only one topic ID is in the fetch set. The other one should be in the toForget. We will be able to remove this partition from the session. If for some reason, we don't try to forget this partition — one of the partitions in the session will cause an inconsistent topic ID error and the metadata for this partition will be refreshed — this should result in the old ID being removed from the session. This should not happen if the FetchSessionHandler is correctly in sync.
* For the forgotten partitions we have the same cases:
1. partition in session is resolved, forgotten is resolved: trivial
2. partition in session is unresolved, forgotten is unresolved: in code, this is equivalent to the case above, so trivial as well
3. partition in session is unresolved, forgotten is resolved: this means the partition in the session does not have a name, but the metadata cache now contains the name – to fix this we can check if there exists a cached partition with the given ID and try to forget it before we check the resolved name case.
4. partition in session is resolved, update is unresolved: this means the partition in the session has a name, but the update was unable to be resolved (ie, the topic is deleted) We will look up the partition using the ID. We will find the old version with a name and be able to delete it.
5. both partitions in the session have IDs, but they are different: This should be the same case as described above. If we somehow do not have the ID in the session, no partition will be removed. This should not happen unless the Fetch Session Handler is out of sync.
* Partitions in the session - there may be some partitions in the session already that are unresolved. We can resolve them in forEachPartition using a method that checks if the partition is unresolved and tries to resolve it using a topicName map from the request. The partition will be resolved before the function using the cached partition is applied.
Reviewers: David Jacot <djacot@confluent.io>
This test was disabled in af8100b94f. The reason the test was failing is that it assumes that the reference to `servers` can be mutated directly. The implementation in `IntegrationTestHarness` is intended to allow this by returning a mutable buffer, but the implementation actually returns a copy of the underlying collection. This caused the test case to create multiple `KafkaServer` instances instead of one as intended because it was modifying the copy. This led to the broker registration failure.
Reviewers: David Jacot <djacot@confluent.io>
This patch fixes a bug in `DynamicBrokerConfig` which causes some configuration changes to be ignored. In particular, the bug is the result of the reference to the old configuration getting indirectly mutated prior to the call to `BrokerReconfigurable.reconfigure`. This causes the first dynamic configuration update to pass effectively the same configuration as both `oldConfig` and `newConfig`. In cases such as in `DynamicThreadPool`, the update is ignored because the old configuration value matches the new configuration value.
This bug only affects KRaft. It is protected in the zk broker by the call to `DynamicBrokerConfig.initialize()`, which overwrites the stored reference to the original configuration. The patch fixes the problem by ensuring that `initialize()` is also invoked in KRaft when `BrokerServer` starts up.
Reviewers: David Jacot <djacot@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
Change ZooKeeperTestHarness to QuorumTestHarness so that integration tests which inherit from
this class can test Kafka in both ZK and KRaft mode. Test cases which do this can specify the
modes they support by including a ParameterizedTest annotation before each test case, like the
following:
@ParameterizedTest
@valuesource(strings = Array("zk", "kraft"))
def testValidCreateTopicsRequests(quorum: String): Unit = { ... }
For each value that is specified here (zk, kraft), the test case will be run once in the
appropriate mode. So the test shown above is run twice. This allows integration tests to be
incrementally converted over to support KRaft mode, rather than rewritten to support it. For
now, test cases which do not specify a quorum argument will continue to run only in ZK mode.
JUnit5 makes the quorum annotation visible in the TestInfo object which each @BeforEeach
function in a test can optionally take. Therefore, this PR converts over the setUp function of
the quorum base class, plus every derived class, to take a TestInfo argument. The TestInfo
object gets "passed up the chain" to the base class, where it determines which quorum type we
create (ZK or KRaft). In a few cases, I discovered test cases inheriting from the test harness
that had more than one @BeforeEach function. Because the JUnit5 framework does not define the
order in which @BeforeEach hooks are run, I changed these to overload setUp() instead, to avoid
undefined behavior.
The general approach taken here is to make as much as possible work with KRaft, but to leave some
things as ZK-only when appropriate. For example, a test that explicitly requests an AdminZkClient
object will get an exception if it is running in KRaft mode. Similarly, tests which explicitly
request KafkaServer rather than KafkaBroker will get an exception when running in KRaft mode.
As a proof of concept, this PR converts over kafka.api.MetricsTest to support KRaft.
This PR also renames the quorum controller event handler thread to include the text
"QuorumControllerEventHandler". This allows QuorumTestHarness to check for hanging quorum
controller threads, as it does for hanging ZK-based controller threads.
Finally, ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
caused many failing test runs. Therefore, I disabled it here and filed KAFKA-13421 to fix the
test logic to be more reliable.
Reviewers: Jason Gustafson <jason@confluent.io>, Igor Soarez <soarez@apple.com>
This task is to provide a concrete implementation of the interfaces defined in KIP-255 to allow Kafka to connect to an OAuth/OIDC identity provider for authentication and token retrieval. While KIP-255 provides an unsecured JWT example for development, this will fill in the gap and provide a production-grade implementation.
The OAuth/OIDC work will allow out-of-the-box configuration by any Apache Kafka users to connect to an external identity provider service (e.g. Okta, Auth0, Azure, etc.). The code will implement the standard OAuth client credentials grant type.
The proposed change is largely composed of a pair of AuthenticateCallbackHandler implementations: one to login on the client and one to validate on the broker.
See the following for more detail:
KIP-768
KAFKA-13202
Reviewers: Yi Ding <dingyi.zj@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
Put ZkMetadataCache in the kafka.server.metadata package rather than the kafka.server package, so
that its package is consistent with its position in the source directory hierarchy.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Make TestUtils usable for KRaft mode by using KafkaBroker instead of KafkaServer where appropriate,
and adding some alternate functions that use AdminClient instead of ZooKeeper.
Reviewers: Jason Gustafson <jason@confluent.io>
When loading a snapshot the broker BrokerMetadataListener was using the batch's append time, offset
and epoch. These are not the same as the append time, offset and epoch from the log. This PR fixes
it to instead use the lastContainedLogTimeStamp, lastContainedLogOffset and lastContainedLogEpoch
from the SnapshotReader.
This PR refactors the MetadataImage and MetadataDelta to include an offset and epoch. It also swaps
the order of the arguments for ReplicaManager.applyDelta, in order to be more consistent with
MetadataPublisher.publish.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds and
KRaftMetadataCache#topicIdsToNames by returning a map subclass that
exposes the TopicsImage data structures without copying them.
Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Before we used the metadata cache to determine whether or not to use topic IDs. Unfortunately, metadata cache updates with ZK controllers are in a separate request and may be too slow for the fetcher thread. This results in switching between topic names and topic IDs for topics that could just use IDs.
This patch adds topic IDs to FetcherState created in LeaderAndIsr requests. It also supports updating this state for follower threads as soon as a LeaderAndIsr request provides a topic ID.
We've opted to only update replica fetcher threads. AlterLogDir threads will use either topic name or topic ID depending on what was present when they were created.
Reviewers: David Jacot <djacot@confluent.io>
This also fixes KAFKA-13070.
We have seen a problem caused by shutting down the scheduler before shutting down LogManager.
When LogManager was closing partitions one by one, the scheduler called to delete old segments due to retention. However, the old segments could have been closed by the LogManager, which caused an exception and subsequently marked logdir as offline. As a result, the broker didn't flush the remaining partitions and didn't write the clean shutdown marker. Ultimately the broker took hours to recover the log during restart.
This PR essentially reverts #10538
Reviewers: Ismael Juma <ismael@juma.me.uk>, Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
The `LastTimestamp` field is useful because its value is present even when there are no data batches written by a given producerId.
Reviewers: David Jacot <djacot@confluent.io>
Add support for CreateTopicsPolicy and AlterConfigsPolicy when running in KRaft mode.
Reviewers: David Arthur <mumrah@gmail.com>, Niket Goel <ngoel@confluent.io>
Internal topic configs with default value are not included in the response of CreateTopic/DescribeTopic. However, if they are explicitly set, they will be included in the response.
Reviewers: Jun Rao <junrao@gmail.com>
This patch fixes a deadlock when incrementing the high watermark after the synchronous zk ISR modification happens. The main difference is that we prevent the callback from executing while under the leader and ISR lock. The deadlock bug was introduced in https://github.com/apache/kafka/pull/11245.
Reviewers: David Jacot <djacot@confluent.io>
The ReplicaManager, LogManager, and KafkaApis class all have many
constructor parameters. It is often difficult to add or remove a
parameter, since there are so many locations that need to be updated. In
order to address this problem, we should use named parameters when
constructing these objects from Scala code. This will make it easy to
add new optional parameters without modifying many test cases. It will
also make it easier to read git diffs and PRs, since the parameters will
have names next to them. Since Java does not support named paramters,
this PR adds several Builder classes which can be used to achieve the
same effect.
ReplicaManager also had a secondary constructor, which this PR removes.
The function of the secondary constructor was just to provide some
default parameters for the main constructor. However, it is simpler just
to actually use default parameters.
Reviewers: David Arthur <mumrah@gmail.com>
This PR aims to remove tombstones that persist indefinitely due to low throughput. Previously, deleteHorizon was calculated from the segment's last modified time.
In this PR, the deleteHorizon will now be tracked in the baseTimestamp of RecordBatches. After the first cleaning pass that finds a record batch with tombstones, the record batch is recopied with deleteHorizon flag and a new baseTimestamp that is the deleteHorizonMs. The records in the batch are rebuilt with relative timestamps based on the deleteHorizonMs that is recorded. Later cleaning passes will be able to remove tombstones more accurately on their deleteHorizon due to the individual time tracking on record batches.
KIP 534: https://cwiki.apache.org/confluence/display/KAFKA/KIP-534%3A+Retain+tombstones+and+transaction+markers+for+approximately+delete.retention.ms+milliseconds
Co-authored-by: Ted Yu <yuzhihong@gmail.com>
Co-authored-by: Richard Yu <yohan.richard.yu@gmail.com>
This patch fixes several problems with the `ElectLeaders` API in KRaft:
- `KafkaApis` did not properly forward this request type to the controller.
- `ControllerApis` did not handle the request type.
- `ElectLeadersRequest.getErrorResponse` may raise NPE when `TopicPartitions` is null.
- Controller should not do preferred election if `ElectLeaders` specifies `UNCLEAN` election.
- Controller should not do unclean election if `ElectLeaders` specifies `PREFERRED` election.
- Controller should use proper error codes to handle cases when desired leader is unavailable or when no election is needed because a desired leader is already elected.
- When election for all partitions is requested (indicated with null `TopicPartitions` field), the response should not return partitions for which no election was necessary.
In addition to extending the unit test coverage in `ReplicationControlManagerTest`, I have also converted `LeaderElectionCommandTest` to use KRaft.
Reviewers: dengziming <swzmdeng@163.com>, José Armando García Sancio <jsancio@users.noreply.github.com>, David Arthur <mumrah@gmail.com>
Some plugins make use of KafkaConfig#originals rather than the
KafkaConfig object. We should ensure that these plugins see the
correct value for broker.id if the broker is running in KRaft mode and
node.id has been configured, but not broker.id.
This PR does this by ensuring that both node.id and broker.id are set in
the originals map if either one is set. We also check that they are set
to the same value in KafkaConfig#validateValues.
Co-author: Ron Dagostino <rdagostino@confluent.io>
Avoid using the non-public API KafkaFutureImpl in the Admin client's `*Result` class constructors.
This is particularly problematic for `DescribeConsumerGroupsResult` which currently has a
public constructor. For the other classes the rationale is simply consistency with the majority of
the `*Result` classes.
Reviewers: Ismael Juma <ismael@juma.me.uk, David Jacot <djacot@confluent.io>, Luke Chen <showuon@gmail.com>
This patch adds the `ActiveBrokerCount` and the `FencedBrokerCount` metrics to the ZK controller. Note that `FencedBrokerCount` is always set to zero in the ZK controller.
Reviewers: Jason Gustafson <jason@confluent.io>
`ReplicationTest.test_replication_with_broker_failure` in KRaft mode sometimes fails with the following error in the log:
```
[2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Unexpected error occurred while processing data for partition __consumer_offsets-1 at offset 31727 (kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end offset = 31728. at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532) at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355) at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31 11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Partition __consumer_offsets-1 marked as failed (kafka.server.ReplicaFetcherThread)
```
The issue is due to a race condition in `ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created and populated before the partition is removed from the fetcher threads. This means that the fetch offset of the `InitialFetchState` could be outdated when the fetcher threads are re-started because the fetcher threads could have incremented the log end offset in between.
The patch fixes the issue by removing the partitions from the replica fetcher threads before creating the `InitialFetchState` for them.
Reviewers: Jason Gustafson <jason@confluent.io>
### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
We restore the 3.4.x/3.5.x behavior unless the caller has set the property (note that ZKConfig
auto configures itself if certain system properties have been set).
I added a unit test that fails without the change and passes with it.
I also refactored the code to streamline the way we handle parameters passed to
KafkaZkClient and ZooKeeperClient.
See https://github.com/apache/zookeeper/pull/1129 for the details on why the behavior
changed in 3.6.0.
Credit to @rondagostino for finding and reporting this issue.
Reviewers: David Jacot <djacot@confluent.io>
The controller can skip sending updateMetadataRequest during the broker failure callback if there are offline partitions and the deleted brokers don't host any partitions.
Reviewers: Jun Rao <junrao@gmail.com>
After a topic is deleted, the topic is marked for deletion, create topic with the same name throw exception topic already exists. It should indicate the topic is marked for deletion.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
MINOR Refactored the existing CheckpointFile in core module, moved to server-common module.
Refactored CheckpointFile to server-common module as a Java class and it is reused by LeaderCheckpointFile, OffsetCheckpointFile.
This will be used by CommittedOffsetsFile which checkpoints remote log metadata partitions with respective offsets in the default RemoteLogMetadataManager implementation.
Existing tests are available for LeaderCheckpointFile, OffsetCheckpointFile.
Reviewers: Jun Rao <junrao@gmail.com>
The original code uses a RemoteLogManagerConfig class to store KIP-405 configs and adds three configs to LogConfig. This makes the code complicated and developers may be confused.
This PR allows us to access RemoteLogManagerConfig from KafkaConfig and do the same for LogConfig. Kafka developers will see the same interface for the KIP-405 configs. After this change, if we want to read remoteStorageEnable we should use LogConfig.tieredLogConfig.remoteStorageEnable instead of LogConfig.remoteStorageEnable. The same for localRetentionMs and localRetentionBytes. If we want to read configs in RemoteLogManagerConfig, we should use KafkaConfig.tieredKafkaConfig.xxx.
Reviewers: Satish Duggana <satishd@apache.org>, Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
When debugging issues with partition state, it's very useful to know the zkVersion that was written. This patch adds the zkVersion of LeaderAndIsr in a few more places.
This patch refactors `ReplicaManager#becomeLeaderOrFollower` to avoid having to re-iterate over all the partitions to determine which ones should become leaders and which ones should become followers.
The patch also refactors how partitions are marked as offline when the log can't be created. Before the patch, we were iterating over all the partitions in the request or in the delta to mark them as offline is the log was not present. Now, we mark them as failed directly if the log can not be created.
Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
A few small logging improvements:
- Only print error when it is not NONE
- Full list of remaining partitions are printed only at debug level
- Only backoff and print retry logging if there are remaining retries
Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
Small locking improvement to drop the group metadata lock before invoking the response callback in `GroupCoordinator#handleHeartbeat`.
Reviewers: David Jacot <djacot@confluent.io>
This patch ensure that on-going compaction is aborted when `compact` is removed from the `cleanup.policy` of a topic.
Reviewers: Lucas Bradstreet <lucas@confluent.io>, Jun Rao <junrao@gmail.com>
Removes assertion added in #10471. It's unsafe to assert that
there are partition movements ongoing for some of the tests in
the suite because partitions in some of the tests have 0 data,
which may complete reassignment before `verify` can run.
Tests pass locally.
Reviewers: Luke Chen <showuon@gmail.com>, Ismael Juma <ismael@juma.me.uk>
The BrokerState metric always has a value of 0, for NOT_RUNNING, in KRaft
clusters. This patch fixes it and adds a test.
Reviewers: Ismael Juma <ismael@juma.me.uk>
After we have shrunk the ISR, we have an opportunity to advance the high watermark. We do this currently in `maybeShrinkIsr` after the synchronous update through ZK. For the `AlterIsr` path, however, we cannot rely on this call since the request is sent asynchronously. Instead we should attempt to advance the high watermark in the callback when the `AlterIsr` response returns successfully.
Reviewers: David Jacot <djacot@confluent.io>
Stop the replica and resign the coordinators when a replica gets reassigned away from a topic partition.
1. Implement localChanges in TopicsDelta and TopicDelta to return all of the partitions that were deleted, became leader and became follower for the given broker id.
2. Add tests for TopicsDelta::localChanges
3. Resign coordinators that were moved away from the consumer offset and transaction topic partitions.
4. Add replica manager tests for testing reassignment of replicas and removal of topic.
5. Add a new type LocalReplicaChanges that encapsulates topic partitions deleted, became leader and became follower.
Reviewers: Jun Rao <junrao@gmail.com>
AbstractFetcherThread#truncateOnFetchResponse is used with IBP 2.7 and above to truncate partitions based on diverging epoch returned in fetch responses. Truncation should only be performed for partitions that are still owned by the fetcher and this check should be done while holding partitionMapLock to ensure that any partitions removed from the fetcher thread are not truncated. Truncation will be performed by any new fetcher that owns the partition when it restarts fetching.
Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
When the high watermark is contained in a non-active segment, we are not correctly bounding it by the hwm. This means that uncommitted records may overwrite committed data. I've separated out the bounding point tests to check the hwm case in addition to the existing active segment case.
Reviewers: Jun Rao <junrao@gmail.com>
In this PR, I've renamed kafka.log.Log to kafka.log.UnifiedLog. With the advent of KIP-405, going forward the existing Log class would present a unified view of local and tiered log segments, so we rename it to UnifiedLog. The motivation for this PR is also the same as outlined in this design document: https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit.
This PR is a follow-up to #10280 where we had refactored the Log layer introducing a new kafka.log.LocalLog class.
Note: the Log class name had to be hardcoded to ensure metrics are defined under the Log class (for backwards compatibility). Please refer to the newly introduced UnifiedLog.metricName() method.
Reviewers: Cong Ding <cong@ccding.com>, Satish Duggana <satishd@apache.org>, Jun Rao <junrao@gmail.com>
When processing the topics delta, make sure that the replica manager partition state and replica fetcher state matches the information included in the topic delta. Also ensure that delayed operations are processed after the follower state change has been made since that is what allows them to be completed.
Reviewers: Jason Gustafson <jason@confluent.io>
Validate that KRaft controllers are members of the KRaft quorum, and non-controllers are not.
This validation assumes that controllers and brokers have the same ID only when they are
co-located.
Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@gmail.com>, Luke Chen <showuon@gmail.com>
Most of [KAFKA-13132](https://issues.apache.org/jira/browse/KAFKA-13132) has been resolved, but there is one part of one case not covered.
From the ticket:
`2. We only assign the topic ID when we are associating the log with the partition in replicamanager for the first time`
We covered the case where the log is already existing when the leader epoch is _equal_ (ie, no updates besides the topic ID), but we don't cover the update case where the leader epoch is bumped and we already have the log associated to the partition.
This PR ensures we correctly assign topic ID in the makeLeaders/Followers path when the log already exists.
I've also added a test for the bumped leader epoch scenario.
Reviewers: Jason Gustafson <jason@confluent.io>
When dealing with the default resource, BrokerMetadataPublisher should translate its name from the empty
string (KRaft convention) to "<default>" (ZK convention). In the long term, we should eventually move from
using a string to this for using an Option[String].
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Controlled shutdown in KRaft is signaled through a heartbeat request with the `shouldShutDown` flag set to true. When we begin controlled shutdown, we should immediately schedule the next heartbeat instead of waiting for the next periodic heartbeat. This allows the broker to shutdown more quickly.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This patch improves the return type for `scheduleAppend` and `scheduleAtomicAppend`. Previously we were using a `Long` value and using both `null` and `Long.MaxValue` to distinguish between different error cases. In this PR, we change the return type to `long` and only return a value if the append was accepted. For the error cases, we instead throw an exception. For this purpose, the patch introduces a couple new exception types: `BufferAllocationException` and `NotLeaderException`.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
RaftClient's scheduleAppend may split the list of records into multiple
batches. This means that it is possible for the active controller to
see a committed offset for which it doesn't have an in-memory snapshot.
If the active controller needs to renounce and it is missing an
in-memory snapshot, then revert the state and reregister the Raft
listener. This will cause the controller to replay the entire metadata
partition.
Reviewers: Jason Gustafson <jason@confluent.io>
This patch improves logging around follower truncation. Specifically, the log message includes the end epoch state obtained from the leader and the resulting truncation state on the follower. An example log message is given below:
```
Truncating partition topic1-0 with TruncationState(offset=5, completed=true) due to leader epoch and offset EpochEndOffset(errorCode=0, partition=0, leaderEpoch=0, endOffset=5)
```
Reviewers: Jason Gustafson <jason@confluent.io>
The configs `alter.config.policy.class.name` and `create.topic.policy.class.name` are not yet supported by KRaft. KRaft servers should fail startup if any of these are configured.
Reviewers: Luke Chen <showuon@gmail.com>, David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
In 3.0, there was a change that resulted in no longer assigning topic IDs to the log and the partition.metadata file in certain upgrade scenarios, specifically when upgrading from IBP 2.7 or below to 3.0. In this case, there may not be a bump to the leader epoch when the topicId is assigned by the controller, so the LeaderAndIsr request from the controller would be ignored by the replica. This PR fixes the problem by adding a check for whether we need to handle the LeaderAndIsr request given a new topic ID when one is not yet assigned in the log and code to assign a topic ID when the log is already associated to a partition in ReplicaManager.
Reviewers: Jason Gustafson <jason@confluent.io>
This PR removes the `METADATA` API from the Kraft controller as the controller does not yet implement the metadata fetch functionality completely.
Without the change (as per the JIRA https://issues.apache.org/jira/browse/KAFKA-13143), the API would return an empty list of topics making the caller incorrectly think that there were no topics in the cluster which could be confusing. After this change the describe and list topic APIs timeout on the controller endpoint when using the `kafka-topics` CLI (which is the same behavior as create_topic).
Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
When expiring transactionalIds, we group the tombstones together into batches. Currently there is no limit on the size of these batches, which can lead to `MESSAGE_TOO_LARGE` errors when a bunch of transactionalIds need to be expired at the same time. This patch fixes the problem by ensuring that the batch size respects the configured limit. Any transactionalIds which are eligible for expiration and cannot be fit into the batch are postponed until the next periodic check.
Reviewers: David Jacot <djacot@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
In 2.8, the dump log output regressed to print batch level information for each record, which makes the output much noisier. This patch changes the output to what it was in 2.7 and previous versions. We only print batch metadata at the batch level.
Reviewers: David Arthur <mumrah@gmail.com>, Ismael Juma <ismael@juma.me.uk>
This patch fixes BrokerMetadataPublisher.findGhostReplicas (renamed to findStrayPartitions)
so that it returns the stray partitions. Previously it was returning the non-stray partitions. This
caused all of these partitions to get deleted on startup by mistake.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, José Armando García Sancio <jsancio@gmail.com>
These failures were caused by a46b82bea9. Details for each test:
* message_format_change_test: use IBP 2.8 so that we can write in older message
formats.
* compatibility_test_new_broker_test_failures: fix down-conversion path to handle
empty record batches correctly. The record scan in the old code ensured that
empty record batches were never down-converted, which hid this bug.
* upgrade_test: set the IBP 2.8 when message format is < 0.11 to ensure we are
actually writing with the old message format even though the test was passing
without the change.
Verified with ducker that some variants of these tests failed without these changes
and passed with them. Also added a unit test for the down-conversion bug fix.
Reviewers: Jason Gustafson <jason@confluent.io>
When the replica fetcher receives a top-level error in the fetch response, it marks all partitions are failed and adds a backoff delay before resuming fetching from them. In addition to this, there is an additional backoff enforced after the top-level error is handled, so we end up waiting twice the configured backoff time before resuming. This patch removes this extra backoff.
Reviewers: Jason Gustafson <jason@confluent.io>
In `deleteLogs`, we use a consistent value for `fileDeleteDelayMs`
for the whole method. In `DynamicBrokerConfig.reconfigure`, it's
a minor readability improvement, but there should be no change in
behavior.
Reviewers: David Arthur <mumrah@gmail.com>
This patch closes a test gap where we do not check ReplicaManager metrics remain as expected. There
was a bug in 2.8 where the metrics moved under a different class name for the KRaft case. Having
such tests would have helped identify the bug.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Checked the documentation, we must use `--zookeeper` option in 3 places (alter and describe):
1. user configs where the config is a SCRAM mechanism name (i.e. a SCRAM credential for a user)
2. update broker configs for a particular broker when that broker is down
3. broker default configs when all brokers are down
Reference:
1. [config SCRAM Credentials](https://kafka.apache.org/documentation/#security_sasl_scram_credentials)
2. [Update config before broker started](https://kafka.apache.org/documentation/#dynamicbrokerconfigs)
So, after this PR, we only support `--zookeeper` on `users` and `brokers` entity. Add some argument parse rules and tests.
Reviewers: Ron Dagostino <rdagostino@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Also:
* Deprecate `log.message.format.version` and `message.format.version`.
* Log broker warning if the deprecated config values are ignored due to
the inter-broker protocol version.
* Log warning if `message.format.version` is set via `ConfigCommand`.
* Always down-convert if fetch version is v3 or lower.
* Add tests to verify new message format version based on the
inter-broker protocol version.
* Adjust existing tests that create topics with an older message format to
have the inter-broker protocol set to 2.8.
* Add upgrade note.
Note that the log compaction change to always write new segments with
record format v2 if the IBP is 3.0 or higher will be done as part of
KAFKA-13093 (with Kafka 3.1 as the target release version).
Reviewers: David Jacot <djacot@confluent.io>, David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
Check and verify generated snapshots for the controllers and the
brokers. Assert reader state when reading last log append time.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Fix a bug where if a snapshot file is deleted while we're running snapshot recovery,
a NoSuchFileException will be thrown and snapshot recovery will fail.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
After noticing increased LISR times, we discovered a lot of time was spent synchronously flushing the partition metadata file. This PR changes the code so we asynchronously flush the files.
We ensure files are flushed before appending, renaming or closing the log to ensure we have the partition metadata information on disk. Three new tests have been added to address these cases.
Reviewers: Lucas Bradstreet <lucas@confluent.io>, Jun Rao <junrao@gmail.com>
Support the KIP-455 reassignment API when in KRaft mode. Reassignments
which merely rearrange partitions complete immediately. Those that only
remove a partition complete immediately if the ISR would be non-empty
after the specified removals. Reassignments that add one or more
partitions follow the KIP-455 pattern of adding all the adding replicas
to the replica set, and then waiting for the ISR to include all the new
partitions before completing. Changes to the partition sets are
accomplished via PartitionChangeRecord.
Reviewers: Jun Rao <junrao@gmail.com>
TL;DR:
This PR implements the details of the Log layer refactor, as outlined in this document: https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit. Few details maybe different from the doc, but it is more or less the same.
STRATEGY:
In this PR, I've extracted a new class called LocalLog out of Log. Currently LocalLog is purely an implementation detail thats not exposed outside Log class (except for tests). The object encapsulation is that each Log instance wraps around a LocalLog instance.
This new LocalLog class attempts to encompass most of the responsibilities of local log surrounding the segments map, which otherwise were present in Log previously. Note that not all local log responsibilities have been moved over to this new class (yet). The criteria I used was to preserve (for now) in existing Log class, any logic that is mingled in a complex manner with the logStartOffset or the LeaderEpochCache or the ProducerStateManager.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Satish Duggana <satishd@apache.org>, Jun Rao <junrao@gmail.com>
Fix a simulation test failure by:
1. Relaxing the valiation of the snapshot id against the log start
offset when the state machine attempts to create new snapshot. It
is safe to just ignore the request instead of throwing an exception
when the snapshot id is less that the log start offset.
2. Fixing the MockLog implementation so that it uses startOffset both
externally and internally.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Add an internal configuration in order to facilitate system and integration tests that need a smaller
log segment size. Since this is not intended for use in production, log an ERROR message if it is
set to a non-default level.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This patch adds a check to ensure that principal builder implementations implement `KafkaPrincipalSerde` as specified in KIP-590: https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller. This patch also changes the default value of `principal.builder.class` to `DefaultKafkaPrincipalBuilder`, which was already the implicit behavior when no principal builder was specified.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
In Kraft mode, Apache Kafka 2.8.0 advertises the socket port instead of the configured advertised port.
A broker with the following configuration:
listeners=PUBLIC://0.0.0.0:19092,REPLICATION://0.0.0.0:9091
advertised.listeners=PUBLIC://envoy-kafka-broker:9091,REPLICATION://kafka-broker1:9091
advertises on the PUBLIC listener envoy-kafka-broker:19092, however I would expect that
envoy-kafka-broker:9091 is advertised. In ZooKeeper mode it works as expected. This PR
changes the BrokerServer class so that in Kraft mode the configured advertised port is
registered as expected.
Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
In Kafka Raft mode, the flow sending request from client to controller is like this:
1. client send request to a random controller (ex: A-controller)
2. A-controller will forward the request to active controller (ex: B-controller) to handle the request
3. After active B-controller completed the request, the A-controller will receive the response, and do a check:
3.1. if the response has "disconnected" or "NOT_CONTROLLER" error, which means the cached active controller is changed. So, clear the cached active controller, and wait for next retry to get the updated active controller from `controllerNodeProvider`
3.2. else, complete the request and respond back to client
In this bug, we have 2 issues existed:
1. "NOT_CONTROLLER" exception won't be correctly send back to the requester, instead, `UNKNOWN_SERVER_ERROR` will be returned. The reason is the `NotControllerException` is wrapped by a `CompletionException` when the `Future` completeExceptionally. And the `CompletionException` will not match any Errors we defined, so the `UNKNOWN_SERVER_ERROR` will be returned. Even if we don't want the `NotControllerException` return back to client, we need to know it to do some check.
fix 1: unwrap the `CompletionException` before encoding the exception to error.
2. Even if we fixed 1st bug, we still haven't fixed this issue. After the 1st bug fixed, the client can successfully get `NotControllerException` now, and keep retrying... until timeout. So, why won't it meet the flow `3.1` mentioned above, since it has `NotControllerException`? The reason is, we wrapped the original request with `EnvelopeRequest` and forwarded to active controller. So, after the active controller completed the request, responded with `NotControllerException`, and then, wrapped into an `EnvelopeResponse` **with no error**, and then send the `EnvelopeResponse` back. That is, in the flow `3.1`, we only got "no error" from `EnvelopeResponse`, not the `NotControllerException` inside.
fix 2: Make the envelope response return `NotControllerException` if the controller response has `NotControllerException`. So that we can catch the `NotControllerException` on envelopeResponse to update the active controller.
Reviewers: wenbingshen <oliver.shen999@gmail.com>, Ismael Juma <ismael@juma.me.uk>, dengziming <dengziming1993@gmail.com>, Jason Gustafson <jason@confluent.io>
When a node is serving as both broker and controller, we should only rely on the controller to write new snapshots.
Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
This patch fixes a few request listener specs. We were missing "broker" for many APIs which are now implemented in KRaft and there were a couple cases where we had unnecessarily exposed a controller-only API on the broker.
Reviewers: Jason Gustafson <jason@confluent.io>
The broker should trigger a snapshot once
metadata.log.max.record.bytes.between.snapshots has been exceeded.
Reviewers: Jason Gustafson <jason@confluent.io>
The sliding window + takeWhile behavior over a sequence seems somewhat
different between Scala 2.12 and Scala 2.13. This PR works around the
difference by using foreach with an early return.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Do not update the commit-sensor if the commit failed and add test logic. The patch also adds 2 unit tests, the first for `OFFSET_METADATA_TOO_LARGE` error, the second is to cover circumstance when one offset is committed and the other is failed with `OFFSET_METADATA_TOO_LARGE`. Both of these cases were uncovered previously.
Reviewers: Jason Gustafson <jason@confluent.io>
Updated FetchRequest and FetchResponse to use topic IDs rather than topic names.
Some of the complicated code is found in FetchSession and FetchSessionHandler.
We need to be able to store topic IDs and maintain a cache on the broker for IDs that may not have been resolved. On incremental fetch requests, we will try to resolve them or remove them if in toForget.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
Fix NPE from addingReplicas and removingReplicas. Make addingReplicas and
removingReplicas in PartitionRecord non-nullable as described in KIP-746.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This implements the request and response portion of KIP-709. It updates the OffsetFetch request and response to support fetching offsets for multiple consumer groups at a time. If the broker does not support the new OffsetFetch version, clients can revert to the previous behaviour and use a request for each coordinator.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Konstantine Karantasis <konstantine@confluent.io>
Set the default assignor to ["range", "cooperative-sticky"] to make it easier for users to switch over to cooperative rebalancing by using only a single rolling bounce.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This PR implements broker-side KRaft snapshots, including both saving and
loading. The code for triggering a periodic broker-side snapshot will come in a
follow-on PR. Loading should work with just this PR. It also implements
reloading broker snapshots after initialization.
In order to facilitate snapshots, this PR introduces the concept of
MetadataImage and MetadataDelta. MetadataImage represents the metadata state
retained in memory. It is basically a generalization of MetadataCache that
includes a few things that MetadataCache does not (such as features and client
quotas.) KRaftMetadataCache is now an accessor for the data stored in this object.
Similarly, MetadataImage replaces CacheConfigRespository and ClientQuotaCache.
It also subsumes kafka.server.metadata.MetadataImage and related classes.
MetadataDelta represents a change to a MetadataImage. When a KRaft snapshot is
loaded, we will accumulate all the changes into a MetadataDelta first, prior to
applying it. If we must reload a snapshot because we fell too far behind while
consuming metadata, the resulting MetadataDelta will contain all the changes
needed to catch us up. During normal operation, MetadataDelta is also used to
accumulate the changes of each incoming batch of metadata records. These
incremental deltas should be relatively small.
I have removed the logic for updating the various manager objects from
BrokerMetadataListener and placed it into BrokerMetadataPublisher. This makes
it easier to unit test BrokerMetadataListener.
Reviewers: David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
This PR includes changes to KafkaRaftClient and KafkaMetadataLog to support periodic
cleaning of old log segments and snapshots.
Four new public config keys are introduced: metadata.log.segment.bytes,
metadata.log.segment.ms, metadata.max.retention.bytes, and
metadata.max.retention.ms.
These are used to configure the log layer as well as the snapshot cleaning logic. Snapshot
and log cleaning is performed based on two criteria: total metadata log + snapshot size
(metadata.max.retention.bytes), and max age of a snapshot (metadata.max.retention.ms).
Since we have a requirement that the log start offset must always align with a snapshot,
we perform the cleaning on snapshots first and then clean what logs we can.
The cleaning algorithm follows:
1. Delete the oldest snapshot.
2. Advance the log start offset to the new oldest snapshot.
3. Request that the log layer clean any segments prior to the new log start offset
4. Repeat this until the retention size or time is no longer violated, or only a single
snapshot remains.
The cleaning process is triggered every 60 seconds from the KafkaRaftClient polling
thread.
Reviewers: José Armando García Sancio <jsancio@gmail.com>, dengziming <dengziming1993@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
This patch ensures that `maxTimestampSoFar` and `offsetOfMaxTimestampSoFar` are consistent with each others. It does so by storing them together. It relates to KIP-734 which exposes them via the admin client.
Reviewers: Ismael Juma <ismael@juma.me.uk>, David Jacot <djacot@confluent.io>
Add the record append time to Batch. Change SnapshotReader to set this time to the
time of the last log in the last batch. Fix the QuorumController to remember the last
committed batch append time and to store it in the generated snapshot.
Reviewers: David Arthur <mumrah@gmail.com>, Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
Segment and index files are currently renamed with a .deleted
suffix prior to async deletion. This serves two purposes, to
resume deletion on broker failure and also protect against
deletion of new segments during truncation (due to deletion
being async).
We should do the same for snapshot files. While they are not subject
to issues around resuming deletion due to the stray snapshot
scanning which is performed on log initialization, we can end up
with situations where truncation queues snapshots for deletion, but
prior to deletion new segments with the same snapshot file name are
created. Async deletion can then delete these new snapshots.
This patch offers a two-stage snapshot deletion which first renames
and removes the segments in question from the ProducerStateManager,
allowing the Log to asynchronously delete them.
Credit to Kowshik Prakasam <kowshik@gmail.com> for finding this issue
and creating the test demonstrating the failure.
Co-authored-by: Kowshik Prakasam <kowshik@gmail.com> Address PR feedback
Reviewers: Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
In getListOffsetsCalls, we rebuild the cluster snapshot for every topic partition. instead, we should reuse a snapshot.
For manual testing (used AK 2.8), i've passed in a map of 6K topic partitions to listOffsets
Without snapshot reuse:
duration of building futures from metadata response: **15582** milliseconds
total duration of listOffsets: **15743** milliseconds
With reuse:
duration of building futures from metadata response: **24** milliseconds
total duration of listOffsets: **235** milliseconds
Reviewers: Luke Chen <showuon@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
This implements KIP-699: https://cwiki.apache.org/confluence/display/KAFKA/KIP-699%3A+Update+FindCoordinator+to+resolve+multiple+Coordinators+at+a+time
It updates FindCoordinator request and response to support resolving multiple coordinators at a time. If a broker does not support the new FindCoordinator version, clients can revert to the previous behaviour and use a request for each coordinator.
Reviewers: David Jacot <djacot@confluent.io>, Tom Bentley <tbentley@redhat.com>, Sanjana Kaundinya <skaundinya@gmail.com>
This patch adds two new apis to support topic deletion using topic IDs or names. It uses a new class `TopicCollection` to keep a collection of topics defined either by names or IDs. Finally, it modifies `DeleteTopicsResult` to support both names and IDs and deprecates the old methods which have become ambiguous. Eventually we will want to deprecate the old `deleteTopics` apis as well, but this patch does not do so.
Reviewers: Jason Gustafson <jason@confluent.io>
Add the ability for KRaft controllers to generate snapshots based on the number of new record bytes that have
been applied since the last snapshot. Add a new configuration key to control this parameter. For now, it
defaults to being off, although we will change that in a follow-on PR. Also, fix LocalLogManager so that
snapshot loading is only triggered when the listener is not the leader.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Fix the JavaDoc for the ClientQuotaManagerConfig#throttle function to
refer to the correct parameter name.
BrokerEndPointTest#testHashAndEquals should test the BrokerEndPoint
class, rather than the MetadataBroker class.
TopicConfigHandler: make the kafkaController argument optional, since we won't
have it when in KRaft mode.
Remove the unecessary ConfigRepository argument for the Partition class.
Remove the unused TestUtils#deleteBrokersInZk function.
Reviewers: Jason Gustafson <jason@confluent.io>
Remove getNonExistingTopics, which was not necessary. MetadataCache
already lets callers check for the existence of topics by calling
MetadataCache#contains.
Add MetadataCache#getAliveBrokerNode and getAliveBrokerNodes. This
simplifies the calling code, which always wants a Node.
Fix a case where we were calling getAliveBrokers and filtering by id,
rather than simply calling getAliveBroker(id) and making use of the hash
map.
Reviewers: Jason Gustafson <jason@confluent.io>, Jose Sancio <jsancio@gmail.com>
When we find a .swap file on startup, we typically want to rename and replace it as .log, .index, .timeindex, etc. as a way to complete any ongoing replace operations. These swap files are usually known to have been flushed to disk before the replace operation begins.
One flaw in the current logic is that we recover these swap files on startup and as part of that, end up truncating the producer state and rebuild it from scratch. This is unneeded as the replace operation does not mutate the producer state by itself. It is only meant to replace the .log file along with corresponding indices. Because of this unneeded producer state rebuild operation, we have seen multi-hour startup times for clusters that have large compacted topics.
This patch fixes the issue. With ext4 ordered mode, the metadata are ordered and no matter it is a clean/unclean shutdown. As a result, we rework the recovery workflow as follows.
If there are any .cleaned files, we delete all .swap files with higher/equal offsets due to KAFKA-6264. We also delete the .cleaned files. If no .cleaned file, do nothing for this step.
If there are any .log.swap files left after step 1, they, together with their index files, must be renamed from .cleaned and are complete (renaming from .cleaned to .swap is in reverse offset order). We rename these .log.swap files and their corresponding index files to regular files, while deleting the original files from compaction or segment split if they haven't been deleted.
Do log splitting for legacy log segments with offset overflow (KAFKA-6264)
If there are any other index swap files left, they must come from partial renaming from .swap files to regular files. We can simply rename them to regular files.
credit: some code is copied from @dhruvilshah3 's PR: #10388
Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Jun Rao <junrao@gmail.com>
If fetchOffset < startOffset, we currently throw OffsetOutOfRangeException when attempting to read from the log in the regular case. But for diverging epochs, we return Errors.NONE with the new leader start offset, hwm etc.. ReplicaFetcherThread throws OffsetOutOfRangeException when processing responses with Errors.NONE if the leader's offsets in the response are out of range and this moves the partition to failed state. The PR adds a check for this case when processing fetch requests and throws OffsetOutOfRangeException regardless of epoch.
Reviewers: Luke Chen <showuon@gmail.com>, Nikhil Bhatia <rite2nikhil@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
This patch fixes the `ConsumerGroupCommand` to correctly handle missing offsets, which are returned as `null` by the admin API.
Reviewers: David Jacot <djacot@confluent.io>
Removed the condition to throw the error. Now we return UNKNOWN_TOPIC_ID which allows clients to retry instead of failing. Updated the test for IBP < 2.8 that tries to delete topics using ID.
Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
When refer to the return "Check whether the last offset of the last batch in this segment overflows the indexes", if the result is not expected, the path of the segment should be printed so that users can find problems.
Reviewers: Luke Chen, Guozhang Wang <wangguoz@gmail.com>
Use MockConfigRepository rather than CachedConfigRepository in unit
tests. This is useful for an upcoming change that will remove
CachedConfigRepository.
Reviewers: David Arthur <mumrah@gmail.com>
We added the DescribeQuorum API in KIP-595. This patch adds the logic to forward DescribeQuorum requests to the controller when KRaft is enabled. The KRaft broker listener has already been enabled in DescribeQuorumRequest.json. The zk broker is not enabled, however, so DescribeQuorum requests will not be advertised and will be rejected at the network layer.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, David Arthur <mumrah@gmail.com>
This patch fixes a match error in `TestRaftServer` which causes the process to crash. We should match against `null` to handle the case of a timeout when polling for events.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@conflluent.io>
To avoid log index 4 byte relative offset overflow, log cleaner group check log segments offset to make sure group offset range not exceed Int.MaxValue.
This offset check currentlly not cosider next is next log segment is empty, so there will left empty log files every about 2^31 messages.
The left empty logs will be reprocessed every clean cycle, which will rewrite it with same empty content, witch cause little no need io.
For __consumer_offsets topic, normally we can set cleanup.policy to compact,delete to get rid of this.
My cluster is 0.10.1.1, but after analyze the trunk code, it should has same problem too.
Co-authored-by: Liu Qiang(BSS-HZ) <qliu.zj@best-inc.com>
Reviewers: Luke Chen <showuon@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
We should process the entire batch in `BrokerMetadataListener` and make sure that `hasNext` is called before calling `next` on the iterator. The previous code worked because the raft client kept track of the position in the iterator, but it caused NoSuchElementException to be raised when the reader was empty (as might be the case with control records).
Reviewers: Jason Gustafson <jason@confluent.io>
Added tiered storage related configs including remote log manager configs.
Added local log retention configs to LogConfig.
Added tests for the added configs.
Reviewers: Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
Upon upgrading to IBP 2.8, topic ID can end up getting reassigned which can cause errors in LeaderAndIsr handling when the partition metadata files from the previous ID are still on the broker.
Topic IDs are stored in the TopicZNode. The behavior of the code before this fix is as follows:
Consider we had a controller with IBP 2.8+. Each topic will be assigned topic IDs and LeaderAndIsr requests will write partition.metadata files to the brokers. If we re-elect the controller and end up with a controller with an older IBP version and we reassign partitions, the TopicZNode is overwritten and we lose the topic ID. Upon electing a 2.8+ IBP controller, we will see the TopicZNode is missing a topic ID and will generate a new one. If the broker still has the old partition metadata file, we will see an ID mismatch that causes the error.
This patch changes controller logic so that we maintain the topic ID in the controller and the ZNode even when IBP < 2.8. This means that in the scenario above, reassigning partitions will not result in losing the topic ID and reassignment.
Topic IDs may be lost when downgrading the code below version 2.8, but upon re-upgrading to code version 2.8+, before bumping the IBP, all partition metadata files will be deleted to prevent any errors.
Reviewers: Lucas Bradstreet <lucas@confluent.io>, David Jacot <djacot@confluent.io>