This patch implemented the second part of KIP-915. It bumps the versions of the value records used by the group coordinator and the transaction coordinator to make them flexible versions. The new versions are not used when writing to the partitions but only when reading from the partitions. This allows downgrades from future versions that will include tagged fields.
Reviewers: David Jacot <djacot@confluent.io>
This patch implemented the first part of KIP-915. It updates the group coordinator and the transaction coordinator to ignores unknown record types while loading their respective state from the partitions. This allows downgrades from future versions that will include new record types.
Reviewers: Alexandre Dupriez <alexandre.dupriez@gmail.com>, David Jacot <djacot@confluent.io>
FinalizedFeatureChangeListener shuts the broker down when it encounters an issue trying to process feature change
events. However, it does not distinguish between issues related to feature changes actually failing and other
exceptions like ZooKeeper session expiration. This introduces the possibility that Zookeeper session expiration
could cause the broker to shutdown, which is not intended. This patch updates the code to distinguish between
these two types of exceptions. In the case of something like a ZK session expiration it logs a warning and continues.
We shutdown the broker only for FeatureCacheUpdateException.
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Christo Lolov <christololov@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
We have seen the following error in logs:
```
"Mar 22, 2019 @ 21:57:56.655",Error,"kafka-0-0","transaction-log-manager-0","Uncaught exception in scheduled task 'transactionalId-expiration'","java.lang.IllegalArgumentException: Illegal new producer epoch -1
```
Investigations showed that it is actually possible for a transaction metadata object to still have -1 as producer epoch when it transitions to Dead.
When a transaction metadata is created for the first time (in handleInitProducerId), it has -1 as its producer epoch. Then a producer epoch is attributed and the transaction coordinator tries to persist the change. If the write fail for instance because there is an under min isr, the transaction metadata remains with its epoch as -1 forever or until the init producer id is retried.
This means that it is possible for transaction metadata to remain with -1 as producer epoch until it gets expired. At the moment, this is not allowed because we enforce a producer epoch greater or equals to 0 in prepareTransitionTo.
Reviewers: Luke Chen <showuon@gmail.com>, Justine Olshan <jolshan@confluent.io>
When a leader becomes a follower, it is likely that it has uncommitted records in its log. When it reaches out to the leader, the leader will detect that they have diverged and it will return the diverging epoch and offset. The follower truncates it log based on this.
There is a small caveat in this process. When the leader return the diverging epoch and offset, it also includes its high watermark, low watermark, start offset and end offset. The current code in the `AbstractFetcherThread` works as follow. First it process the partition data and then it checks whether there is a diverging epoch/offset. The former may accidentally expose uncommitted records as this step updates the local watermark to whatever is received from the leader. As the follower, or the former leader, may have uncommitted records, it will be able to updated the high watermark to a larger offset if the leader has a higher watermark than the current local one. This result in exposing uncommitted records until the log is finally truncated. The time window is short but a fetch requests coming at the right time to the follower could read those records. This is especially true for clients out there which uses recent versions of the fetch request but without implementing KIP-320.
When this happens, the follower logs the following messages:
* `Truncating XXX to offset 21434 below high watermark 21437`
* `Non-monotonic update of high watermark from (offset=21437 segment=[20998:98390]) to (offset=21434 segment=[20998:97843])`.
This patch proposes to mitigate the issue by starting by checking on whether a diverging epoch/offset is provided by the leader and skip processing the partition data if it is. This basically means that the first fetch request will result in truncating the log and a subsequent fetch request will update the low/high watermarks.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
This adds a new configuration `sasl.server.max.receive.size` that sets the maximum receive size for requests before and during authentication.
Reviewers: Tom Bentley <tbentley@redhat.com>, Mickael Maison <mickael.maison@gmail.com>
Co-authored-by: Manikumar Reddy <manikumar.reddy@gmail.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
When deserializing KRPC (which is used for RPCs sent to Kafka, Kafka Metadata records, and some
other things), check that we have at least N bytes remaining before allocating an array of size N.
Remove DataInputStreamReadable since it was hard to make this class aware of how many bytes were
remaining. Instead, when reading an individual record in the Raft layer, simply create a
ByteBufferAccessor with a ByteBuffer containing just the bytes we're interested in.
Add SimpleArraysMessageTest and ByteBufferAccessorTest. Also add some additional tests in
RequestResponseTest.
Reviewers: Tom Bentley <tbentley@redhat.com>, Mickael Maison <mickael.maison@gmail.com>, Colin McCabe <colin@cmccabe.xyz>
Co-authored-by: Colin McCabe <colin@cmccabe.xyz>
Co-authored-by: Manikumar Reddy <manikumar.reddy@gmail.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Fix two bugs related to dynamic broker configs in KRaft. The first bug is that we are calling reloadUpdatedFilesWithoutConfigChange when a topic configuration is changed, but not when a
broker configuration is changed. This is backwards. This function must be called only for broker
configs, and never for topic configs or cluster configs.
The second bug is that there were several configurations such as max.connections which are related
to broker listeners, but which do not involve changing the registered listeners. We should support
these configurations in KRaft. This PR fixes the configuration change validation to support this case.
Reviewers: Jason Gustafson <jason@confluent.io>, Matthew de Detrich <mdedetrich@gmail.com>
In KAFKA-13310, we tried to fix a issue that consumer#poll(duration) will be returned after the provided duration. It's because if rebalance needed, we'll try to commit current offset first before rebalance synchronously. And if the offset committing takes too long, the consumer#poll will spend more time than provided duration. To fix that, we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
However, in this ticket, we found the async commit will keep sending a new commit request during each Consumer#poll, because the offset commit never completes in time. The impact is that the existing consumer will be kicked out of the group after rebalance timeout without joining the group. That is, suppose we have consumer A in group G, and now consumer B joined the group, after the rebalance, only consumer B in the group.
Besides, there's also another bug found during fixing this bug. Before KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry when retriable error until timeout. After KAFKA-13310, we thought we have retry, but we'll retry after partitions revoking. That is, even though the retried offset commit successfully, it still causes some partitions offsets un-committed, and after rebalance, other consumers will consume overlapping records.
Reviewers: RivenSun <riven.sun@zoom.us>, Luke Chen <showuon@gmail.com>
Currently, preferredReplicaImbalanceCount calculation has a race that becomes negative when topic deletion is initiated simultaneously. This PR addresses the problem by fixing cleanPreferredReplicaImbalanceMetric to be called only once per topic-deletion procedure
Reviewers: Luke Chen <showuon@gmail.com>
When cleaning a topic with transactional data, if the keys used in the user data happen to conflict with the keys in the transaction markers, it is possible for the markers to get removed before the corresponding data from the transaction is removed. This results in a hanging transaction or the loss of the transaction's atomicity since it would effectively get bundled into the next transaction in the log. Currently control records are excluded when building the offset map, but not when doing the cleaning. This patch fixes the problem by checking for control batches in the `shouldRetainRecord` callback.
Reviewers: Jun Rao <junrao@gmail.com>
What:
When a certificate is rotated on a broker via dynamic configuration and the previous certificate expires, the broker to controller connection starts failing with SSL Handshake failed.
Why:
A similar fix was earlier performed in #6721 but when BrokerToControllerChannelManager was introduced in v2.7, we didn't enable dynamic reconfiguration for it's channel.
Summary of testing strategy (including rationale)
Add a test which fails prior to the fix done in the PR and succeeds afterwards. The bug wasn't caught earlier because there was no test coverage to validate the scenario.
Reviewers: Luke Chen <showuon@gmail.com>
When logManager startup and loadLogs, we expect to catch any IOException (ex: out of space error) and turn the log dir into offline. Later, we'll handle the offline logDir in ReplicaManage, so that the cleanShutdown file won't be created when all logDirs are offline. The reason why the broker shutdown with cleanShutdown file after full disk is because during loadLogs and do log recovery, we'll write leader-epoch-checkpoint fil. And if any IOException thrown, we'll wrap it as KafkaStorageException and rethrow. And since we don't catch KafkaStorageException, so the exception is caught in the other place and go with clean shutdown path.
This PR is to fix the issue by catching the KafkaStorageException with IOException cause exceptions during loadLogs, and mark the logDir as offline to let the ReplicaManager handle the offline logDirs.
Reviewers: Jun Rao <jun@confluent.io>, Alok Thatikunta <alok123thatikunta@gmail.com>
The KRaft implementation of the `CreatePartitions` ignores the `validateOnly` flag in the
request and creates the partitions if the validations are successful. Fixed the behavior
not to create partitions upon validation if the `validateOnly` flag is true.
Reviewers: Divij Vaidya <divijvaidya13@gmail.com>, dengziming <dengziming1993@gmail.com>, Jason Gustafson <jason@confluent.io>
With KAFKA-13527 / KIP-784 we introduced a new top-level error code for
the DescribeLogDirs API for versions 3 and above. However, the change
regressed the error handling for versions less than 3 since the response
converter fails to write the non-zero error code out (rightly) for
versions lower than 3 and drops the response to the client which
eventually times out instead of receiving an empty log dirs response and
processing that as a Cluster Auth failure.
With this change, the API conditionally propagates the error code out to
the client if the request API version is 3 and above. This keeps the
semantics of the error handling the same for all versions and restores
the behavior for older versions.
See current behavior in the broker log:
```bash
ERROR] 2022-04-08 01:22:56,406 [data-plane-kafka-request-handler-10] kafka.server.KafkaApis - [KafkaApi-0] Unexpected error handling request RequestHeader(apiKey=DESCRIBE_LOG_DIRS, apiVersion=0, clientId=sarama, correlationId=1) -- DescribeLogDirsRequestData(topics=null)
org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default errorCode at version 0
[ERROR] 2022-04-08 01:22:56,407 [data-plane-kafka-request-handler-10] kafka.server.KafkaRequestHandler - [Kafka Request Handler 10 on Broker 0], Exception when handling request
org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default errorCode at version 0
```
Reviewers: Ismael Juma <ismael@juma.me.uk>
Partitions are assigned to fetcher threads based on their hash modulo the number of fetcher threads. When we resize the fetcher thread pool, we basically re-distribute all the partitions based on the new fetcher thread pool size. The issue is that the logic that resizes the fetcher thread pool updates the `fetcherThreadMap` while iterating over it. The `Map` does not give any guarantee in this case - especially when the underlying map is re-hashed - and that led to not iterating over all the fetcher threads during the process and thus in leaving some partitions in the wrong fetcher threads.
Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
In KIP-815 we replaced KafkaConsumer with AdminClient in GetOffsetShell. In the previous implementation, partitions were just ignored if there is no offset for them, however, we will print -1 instead now, This PR fix this inconsistency.
Reviewers: David Jacot <djacot@confluent.io>, Luke Chen <showuon@gmail.com>
When a replica selector is configured, the partition leader computes a preferred read replica for any fetch from the consumers. When the preferred read replica is not the leader, the leader returns the preferred read replica with `FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)` to the `ReplicaManager`. This causes the fetch to go into in the fetch purgatory because the exit conditions are not met. In turns out that the delayed fetch is not completed until the timeout is reached because the delayed fetch ignores partition with an unknown offset (-1). If the fetch contains only one partition, the fetch is unnecessarily delayed by the timeout time (500ms by default) to only inform the consumer that it has to read from a follower.
This patch fixes the issue by completing the fetch request immediately when a preferred read replica is defined.
Reviewers: David Jacot <djacot@confluent.io>
Implement auto leader rebalance for KRaft by keeping track of the set of topic partitions which have a leader that is not the preferred replica. If this set is non-empty then schedule a leader balance event for the replica control manager.
When applying PartitionRecords and PartitionChangeRecords to the ReplicationControlManager, if the elected leader is not the preferred replica then remember this topic partition in the set of imbalancedPartitions.
Anytime the quorum controller processes a ControllerWriteEvent it schedules a rebalance operation if the there are no pending rebalance operations, the feature is enabled and there are imbalance partitions.
This KRaft implementation only supports the configurations properties auto.leader.rebalance.enable and leader.imbalance.check.interval.seconds. The configuration property leader.imbalance.per.broker.percentage is not supported and ignored.
Reviewers: Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>
Implementation of the protocol for starting and stopping leader recovery after an unclean leader election. This includes the management of state in the controllers (legacy and KRaft) and propagating this information to the brokers. This change doesn't implement log recovery after an unclean leader election.
Protocol Changes
================
For the topic partition state znode, the new field "leader_recovery_state" was added. If the field is missing the value is assumed to be RECOVERED.
ALTER_PARTITION was renamed from ALTER_ISR. The CurrentIsrVersion field was renamed to PartitionEpoch. The new field LeaderRecoveryState was added.
The new field LeaderRecoverState was added to the LEADER_AND_ISR request. The inter broker protocol version is used to determine which version to send to the brokers.
A new tagged field for LeaderRecoveryState was added to both the PartitionRecord and PartitionChangeRecord.
Controller
==========
For both the KRaft and legacy controller the LeaderRecoveryState is set to RECOVERING, if the leader was elected out of the ISR, also known as unclean leader election. The controller sets the state back to RECOVERED after receiving an ALTER_PARTITION request with version 0, or with version 1 and with the LeaderRecoveryState set to RECOVERED.
Both controllers preserve the leader recovery state even if the unclean leader goes offline and comes back online before an RECOVERED ALTER_PARTITION is sent.
The controllers reply with INVALID_REQUEST if the ALTER_PARTITION either:
1. Attempts to increase the ISR while the partition is still RECOVERING
2. Attempts to change the leader recovery state to RECOVERING from a RECOVERED state.
Topic Partition Leader
======================
The topic partition leader doesn't implement any log recovery in this change. The topic partition leader immediately marks the partition as RECOVERED and sends that state in the next ALTER_PARTITION request.
Reviewers: Jason Gustafson <jason@confluent.io>
It is possible to clean a segment partially if the offset map is filled before reaching the end of the segment. The highest offset that is reached becomes the new dirty offset after the cleaning completes. The data above this offset is nevertheless copied over to the new partially cleaned segment. Hence we need to ensure that the transaction index reflects aborted transactions from both the cleaned and uncleaned portion of the segment. Prior to this patch, this was not the case. We only collected the aborted transactions from the cleaned portion, which means that the reconstructed index could be incomplete. This can cause the aborted data to become effectively committed. It can also cause the deletion of the abort marker before the corresponding data has been removed (i.e. the aborted transaction becomes hanging).
Reviewers: Jun Rao <junrao@gmail.com>
I collected a list of the most flaky tests observed lately, checked / created their corresponding tickets, and mark them as ignored for now. Many of these failures are:
0. Failing very frequently in the past (at least in my observations).
1. not investigated for some time.
2. have a PR for review (mostly thanks to @showuon !), but not reviewed for some time.
Since 0), these tests failures are hindering our development; and from 1/2) above, people are either too busy to look after them, or honestly the tests are not considered as providing values since otherwise people should care enough to panic and try to resolve. So I think it's reasonable to disable all these tests for now. If we later learned our lesson a hard way, it would motivate us to tackle flaky tests more diligently as well.
I'm only disabling those tests that have been failed for a while, and if for such time no one have been looking into them, I'm concerned that just gossiping around about those flakiness would not bring people's attention to them either. So my psychological motivation is that "if people do not care about those failed tests for weeks (which, is not a good thing! :P), let's teach ourselves the lesson a hard way when it indeed buries a bug that bites us, or not learn the lesson at all --- that indicates those tests are indeed not valuable". For tests that I only very recently saw I did not disable them.
Reviewers: John Roesler <vvcephei@apache.org>, Matthias J. Sax <mjsax@apache.org>, Luke Chen <showuon@gmail.com>, Randall Hauch <rhauch@gmail.com>
When a socket is closed, corresponding channel should be retained only if there is complete buffered requests.
Reviewers: David Jacot <djacot@confluent.io>
Create KafkaConfigSchema to encapsulate the concept of determining the types of configuration keys.
This is useful in the controller because we can't import KafkaConfig, which is part of core. Also
introduce the TimelineObject class, which is a more generic version of TimelineInteger /
TimelineLong.
Reviewers: David Arthur <mumrah@gmail.com>
Issue:
Imagine a scenario where two threads T1 and T2 are inside UnifiedLog.flush() concurrently:
KafkaScheduler thread T1 -> The periodic work calls LogManager.flushDirtyLogs() which in turn calls UnifiedLog.flush(). For example, this can happen due to log.flush.scheduler.interval.ms here.
KafkaScheduler thread T2 -> A UnifiedLog.flush() call is triggered asynchronously during segment roll here.
Supposing if thread T1 advances the recovery point beyond the flush offset of thread T2, then this could trip the check within LogSegments.values() here for thread T2, when it is called from LocalLog.flush() here. The exception causes the KafkaScheduler thread to die, which is not desirable.
Fix:
We fix this by ensuring that LocalLog.flush() is immune to the case where the recoveryPoint advances beyond the flush offset.
Reviewers: Jun Rao <junrao@gmail.com>
There seemed to be a little sloppiness in the integration tests in regard to admin client creation. Not only was there duplicated logic, but it wasn't always clear which listener the admin client was targeting. This made it difficult to tell in the context of authorization tests whether we were indeed testing with the right principal. As an example, we had a method in TestUtils which was using the inter-broker listener implicitly. This meant that the test was using the broker principal which had super user privilege. This was intentional, but I think it would be clearer to make the dependence on this listener explicit. This patch attempts to clean this up a bit by consolidating some of the admin creation logic and making the reliance on the listener clearer.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
There are a few integration tests for the forwarding logic which were added prior to kraft being ready for integration testing. Now that we have enabled kraft in integration tests, these tests are redundant and can be removed.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
In `KafkaServer, `ZkConfigRepository` is just a wrapper of `zkClient`, so we don't need to create a new one.
Reviewers: Jason Gustafson <jason@confluent.io>
This patch enables `ApiVersionsTest` to test both kraft brokers and controllers. It fixes a minor bug in which the `Envelope` request to be exposed from `ApiVersions` requests to the kraft broker.
Reviewers: Jason Gustafson <jason@confluent.io>