When running in kraft mode, LogManager.startup is called in a different thread than the main broker (#14239)
startup thread (by BrokerMetadataPublisher when the first metadata update is received.) If a fatal
error happens during broker startup, before LogManager.startup is completed, LogManager.shutdown may
mark log dirs as clean shutdown improperly.
This PR includes following change:
1. During LogManager startup time:
- track hadCleanShutdwon info for each log dir
- track loadLogsCompleted status for each log dir
2. During LogManager shutdown time:
- do not write clean shutdown marker file for log dirs which have hadCleanShutdown==false and loadLogsCompleted==false
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Fixed a regression described in KAFKA-15053 that security.protocol only allows uppercase values like PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. With this fix, both lower case and upper case values will be supported (e.g. PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL, plaintext, ssl, sasl_plaintext, sasl_ssl)
Reviewers: Chris Egerton <chrise@aiven.io>, Divij Vaidya <diviv@amazon.com>
This patch implemented the second part of KIP-915. It bumps the versions of the value records used by the group coordinator and the transaction coordinator to make them flexible versions. The new versions are not used when writing to the partitions but only when reading from the partitions. This allows downgrades from future versions that will include tagged fields.
Reviewers: David Jacot <djacot@confluent.io>
This patch implemented the first part of KIP-915. It updates the group coordinator and the transaction coordinator to ignores unknown record types while loading their respective state from the partitions. This allows downgrades from future versions that will include new record types.
Reviewers: Alexandre Dupriez <alexandre.dupriez@gmail.com>, David Jacot <djacot@confluent.io>
FinalizedFeatureChangeListener shuts the broker down when it encounters an issue trying to process feature change
events. However, it does not distinguish between issues related to feature changes actually failing and other
exceptions like ZooKeeper session expiration. This introduces the possibility that Zookeeper session expiration
could cause the broker to shutdown, which is not intended. This patch updates the code to distinguish between
these two types of exceptions. In the case of something like a ZK session expiration it logs a warning and continues.
We shutdown the broker only for FeatureCacheUpdateException.
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Christo Lolov <christololov@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
We have seen the following error in logs:
```
"Mar 22, 2019 @ 21:57:56.655",Error,"kafka-0-0","transaction-log-manager-0","Uncaught exception in scheduled task 'transactionalId-expiration'","java.lang.IllegalArgumentException: Illegal new producer epoch -1
```
Investigations showed that it is actually possible for a transaction metadata object to still have -1 as producer epoch when it transitions to Dead.
When a transaction metadata is created for the first time (in handleInitProducerId), it has -1 as its producer epoch. Then a producer epoch is attributed and the transaction coordinator tries to persist the change. If the write fail for instance because there is an under min isr, the transaction metadata remains with its epoch as -1 forever or until the init producer id is retried.
This means that it is possible for transaction metadata to remain with -1 as producer epoch until it gets expired. At the moment, this is not allowed because we enforce a producer epoch greater or equals to 0 in prepareTransitionTo.
Reviewers: Luke Chen <showuon@gmail.com>, Justine Olshan <jolshan@confluent.io>
When a leader becomes a follower, it is likely that it has uncommitted records in its log. When it reaches out to the leader, the leader will detect that they have diverged and it will return the diverging epoch and offset. The follower truncates it log based on this.
There is a small caveat in this process. When the leader return the diverging epoch and offset, it also includes its high watermark, low watermark, start offset and end offset. The current code in the `AbstractFetcherThread` works as follow. First it process the partition data and then it checks whether there is a diverging epoch/offset. The former may accidentally expose uncommitted records as this step updates the local watermark to whatever is received from the leader. As the follower, or the former leader, may have uncommitted records, it will be able to updated the high watermark to a larger offset if the leader has a higher watermark than the current local one. This result in exposing uncommitted records until the log is finally truncated. The time window is short but a fetch requests coming at the right time to the follower could read those records. This is especially true for clients out there which uses recent versions of the fetch request but without implementing KIP-320.
When this happens, the follower logs the following messages:
* `Truncating XXX to offset 21434 below high watermark 21437`
* `Non-monotonic update of high watermark from (offset=21437 segment=[20998:98390]) to (offset=21434 segment=[20998:97843])`.
This patch proposes to mitigate the issue by starting by checking on whether a diverging epoch/offset is provided by the leader and skip processing the partition data if it is. This basically means that the first fetch request will result in truncating the log and a subsequent fetch request will update the low/high watermarks.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
This change makes sure that Kafka grabs a log dir lock in the following additional cases:
1. When a Kafka node runs in controller only. The current implementation doesn't grab a file lock because the LogManager is never instantiated.
2. When the metadata log dir is different from the log dir(s). The current implementation of LogManager doesn't load or grab a lock on the metadata dir.
Reviewers: Ron Dagostino <rdagostino@confluent.io> , dengziming <dengziming1993@gmail.com>
ZkMetadataCache.getClusterMetadata returns a Cluster object where the aliveNodes were
missing their rack info.
Problem: when ZkMetadataCache updates the metadataSnapshot, includes the rack in
aliveBrokers but not in aliveNodes
Trivial fix with matching assertion in existing unit test.
Note that the Cluster object returned from `MetadataCache.getClusterMetadata(...)`
is passed to `ClientQuotaCallback.updateClusterMetadata(...)`
so it is used, though not by clients, but by service providers.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Older clients can not handle the `REQUEST_TIMED_OUT` error that is returned from `InitProducerId` when the next producerId block cannot be fetched from the controller. In this patch, we return `COORDINATOR_LOAD_IN_PROGRESS` instead which is retriable.
Reviewers: Jason Gustafson <jason@confluent.io>
The test was added with a fix to KAFKA-14379, the problem was that the replication factor for the offset topic was 1 and consumer group coordinator got unavailable when one of the brokers got shut down.
Reviewers: David Jacot <djacot@confluent.io>
In BrokerToControllerChannelManager, set the request timeout to the minimum of the retry timeout
and the controller socket timeout. This fixes some cases where we were unintentionally setting an
overly long request timeout.
Also, the channel manager used by the BrokerLifecycleManager should set a retry timeout equal to
half of the broker session timeout, rather than the entire broker session timeout, to allow for a
retransmission if the initial attempt fails.
These two fixes should address some cases where heartbeat broker requests were not being resent
in a timely fashion after a network glitch.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, José Armando García Sancio <jsancio@apache.org>
The consumer (fetcher) used to refresh the preferred read replica on
three conditions:
1. the consumer receives an OFFSET_OUT_OF_RANGE error
2. the follower does not exist in the client's metadata (i.e., offline)
3. after metadata.max.age.ms (5 min default)
For other errors, it will continue to reach to the possibly unavailable
follower and only after 5 minutes will it refresh the preferred read
replica and go back to the leader.
Another problem is that the client might have stale metadata and not
send fetches to preferred replica, even after the leader redirects to
the preferred replica.
A specific example is when a partition is reassigned. the consumer will
get NOT_LEADER_OR_FOLLOWER which triggers a metadata update but the
preferred read replica will not be refreshed as the follower is still
online. it will continue to reach out to the old follower until the
preferred read replica expires.
The consumer can instead refresh its preferred read replica whenever it
makes a metadata update request, so when the consumer receives i.e.
NOT_LEADER_OR_FOLLOWER it can find the new preferred read replica without
waiting for the expiration.
Generally, we will rely on the leader to choose the correct preferred
read replica and have the consumer fail fast (clear preferred read replica
cache) on errors and reach out to the leader.
Co-authored-by: Jeff Kim <jeff.kim@confluent.io>
Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
With KRaft the cluster metadata topic (__cluster_metadata) has a different implementation compared to regular topic. The user should not be allowed to create this topic. This can cause issues if the metadata log dir is the same as one of the log dirs.
This change returns an authorization error if the user tries to create the cluster metadata topic.
Reviewers: David Arthur <mumrah@gmail.com>
We recently had a bug causing the JoinGroup callback to thrown an exception (https://github.com/apache/kafka/pull/12909). When it happens, the exception is propagated to the caller and the JoinGroup callback is never completed. To make it worst, the member whose callback failed become a zombie because the group coordinator does not expire member with a pending callback.
This patch catch exceptions for both invocation of JoinGroup and SyncGroup callbacks and retry to complete them with a `UNKNOWN_SERVER_ERROR` error if they failed.
Reviewers: Jason Gustafson <jason@confluent.io>
When consumers use static membership protocol, they can not update the rebalance timeout because the group coordinator simply ignore any new values. This patch fixes this.
Reviewers: David Jacot <djacot@confluent.io>
The default replica selector chooses a replica on whether the broker.rack matches the client.rack in the fetch request and whether the offset exists in the follower. If the follower is not in the ISR, we know it's lagging behind which will also lag the consumer behind. there are two cases:
1. the follower recovers and joins the isr. the consumer will no longer fall behind.
2. the follower continues to lag behind. after 5 minutes, the consumer will refresh its preferred read replica and the leader will return the same lagging follower since the offset the consumer fetched up to is capped by the follower's HWM. this can go on indefinitely.
If the replica selector chooses a broker in the ISR then we can ensure that at least every 5 minutes the consumer will consume from an up-to-date replica.
Reviewers: David Jacot <djacot@confluent.io>
This fixes an bug which causes a call to producer.send(record) with a record without a key and configured with batch.size=0 never to return.
Without specifying a key or a custom partitioner the new BuiltInPartitioner, as described in KIP-749 kicks in.
BuiltInPartitioner seems to have been designed with the reasonable assumption that the batch size will never be lower than one.
However, documentation for producer configuration states batch.size=0 as a valid value, and even recommends its use directly. [1]
[1] clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:87
Reviewers: Artem Livshits <alivshits@confluent.io>, Luke Chen <showuon@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
When a consumer makes a fetch request to a follower (KIP-392), the fetch request will sit in the purgatory until `fetch.max.wait.ms` is reached because the purgatory is not completed after replication. This patch aims to complete the delayed fetch purgatory after successfully replicating from the leader.
Reviewers: Artem Livshits <alivshits@confluent.io>, Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
In https://github.com/apache/kafka/pull/11910 , we added a feature to prevent topics with conflicting metrics names from being created. We added a map to store the normalized topic name to the topic names, but we didn't remove it correctly while deleting topics. This PR fixes this bug and add a test.
Reviewers: Igor Soarez <i@soarez.me>, dengziming <dengziming1993@gmail.com>, Jason Gustafson <jason@confluent.io>
We have seen some errors such as the following:
```
org.opentest4j.AssertionFailedError: expected: not equal but was: <OptionalLong.empty>
Stacktrace
org.opentest4j.AssertionFailedError: expected: not equal but was: <OptionalLong.empty>
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39)
at org.junit.jupiter.api.AssertNotEquals.failEqual(AssertNotEquals.java:276)
at org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:265)
at org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:260)
at org.junit.jupiter.api.Assertions.assertNotEquals(Assertions.java:2815)
at kafka.server.KRaftClusterTest.$anonfun$testDescribeQuorumRequestToBrokers$5(KRaftClusterTest.scala:818)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at kafka.server.KRaftClusterTest.testDescribeQuorumRequestToBrokers(KRaftClusterTest.scala:814)
```
The patch changes some of the assertions to wait longer for the condition to be satisfied.
Reviewers: Jason Gustafson <jason@confluent.io>
When using `kafka-features.sh` with the `--feature` parameter, we expect a numeric feature level (e.g. `metadata.version=5`). The help example suggests that we can also use a descriptive version string for `metadata.version` such as `3.3-IV3`, which doesn't work.
Reviewers: David Jacot <djacot@confluent.io>
The error message reported when advertised.listeners is used in
controller only is confusing. When the KRaft server is configured to
controller only the following must be true:
1. `advertised.listeners` is not set
2. `listeners` contains a listener for every name in `controller.listener.names`
3. `controller.listener.names` contains a name for every listener in `listeners`
Reviewers: Jason Gustafson <jason@confluent.io>, Igor Soarez <i@soarez.me>
In https://github.com/apache/kafka/pull/12695, we discovered a gap in our testing of `StandardAuthorizer`. We addressed the specific case that was failing, but I think we need to establish a better methodology for testing which incorporates randomized inputs. This patch is a start in that direction. We implement a few basic property tests using jqwik which focus on prefix searching. It catches the case from https://github.com/apache/kafka/pull/12695 prior to the fix. In the future, we can extend this to cover additional operation types, principal matching, etc.
Reviewers: David Arthur <mumrah@gmail.com>
The test is failing intermittently because we do not wait for propagation of the altered config (LogRetentionTimeMillisProp) across all brokers before proceeding ahead with the test.
This PR makes the following changes:
1. Wait for propagation of altered configuration to propagate to all brokers.
2. Use the existing `killBroker` utility method which waits for shutdown using `awaitshutdown`.
3. Improve code readability by using `TestUtils.incrementalAlterConfigs` to send alter config requests.
Reviewers: Jason Gustafson <jason@confluent.io>
Update security documentation to describe how to configure the KRaft `Authorizer` implementation and include a note about principal forwarding.
Additionally, this patch renames `KafkaConfig.Defaults.DefaultPrincipalSerde` to `DefaultPrincipalBuilder` since the former is somewhat misleading.
Reviewers: David Arthur <mumrah@gmail.com>
Add KRaft controller log level entry in log4j.properties, otherwise, the controller.log is empty in KRaft mode.
Reviewers: José Armando García Sancio <jsancio@apache.org>, Ismael Juma <ismael@juma.me.uk>
When the `BrokerServer` starts its shutting down process, it transitions to `SHUTTING_DOWN` and sets `isShuttingDown` to `true`. With this state change, the follower state changes are short-cutted. This means that a broker which was serving as leader would remain acting as a leader until controlled shutdown completes. Instead, we want the leader and ISR state to be updated so that requests will return NOT_LEADER and the client can find the new leader.
We missed this case while implementing https://github.com/apache/kafka/pull/12187.
This patch fixes the issue and updates an existing test to ensure that `isShuttingDown` has not effect. We should consider adding integration tests for this as well. We can do this separately.
Reviewers: Ismael Juma <ismael@juma.me.uk>, José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
Make all faults in metadata processing on standby controllers be fatal. This is the same behavior-wise as the active controller. This prevents a standby controller from eventually becoming active with incomplete state.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
There is a chance to cause deadlock when multiple threads access ClientRequestQuotaManager. In the version Scala 2.12, the lazy val initialization is under the object lock. The deadlock could happen in the following condition:
In thread a, when ClientRequestQuotaManager.exemptSensor is being initialized, it has acquired the object lock and enters the the actual initialization block. The initialization of 'exemptSensor' requires another lock private val lock = new ReentrantReadWriteLock() and it is waiting for this lock.
In thread b, at the same time, ClientQuotaManager.updateQuota() is called and it has already acquired ReentrantReadWriteLock lock by calling lock.writeLock().lock(). And then it executes info(). If this is the first time accessing Logging.logger, which is also a lazy val, it need to wait for the object lock.
The deadlock happens.
Since the lazy val initialization is under the object lock, we should avoid using lazy val if the initialization function holds another lock to prevent holding two locks at the same time which is prone for deadlock. Change to create exemptSensor during ClientRequestQuotaManager initialization with an expiration time of Long.MaxValue to prevent expiration if request quota is not enabled at that time.
Reviewers: Jason Gustafson <jason@confluent.io>
We should prevent the metadata log from initializing in a known bad state. If the log start offset of the first segment is greater than 0, then must be a snapshot an offset greater than or equal to it order to ensure that the initialized state is complete.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
Disable segment deletion based on size and time by setting the KRaft metadata log's `RetentionMsProp` and `RetentionBytesProp` to `-1`. This will cause `UnifiedLog.deleteRetentionMsBreachedSegments` and `UnifiedLog.deleteRetentionSizeBreachedSegments` to short circuit instead of deleting segments.
Without this changes the included test would fail. This happens because `deleteRetentionMsBreachedSegments` is able to delete past the `logStartOffset`. Deleting past the `logStartOffset` would violate the invariant that if the `logStartOffset` is greater than 0 then there is a snapshot with an end offset greater than or equal to the log start offset.
Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
Currently forwarded requests are not applied to any quotas on either the controller or the broker. The controller-side throttling requires the controller to apply the quota changes from the log to the quota managers, which will be done separately. In this patch, we change the response logic on the broker side to also apply the broker's request quota. The enforced throttle time is the maximum of the throttle returned from the controller (which is 0 until we fix the aforementioned issue) and the broker's request throttle time.
Reviewers: David Arthur <mumrah@gmail.com>
The consumer group instance ID is used to support a notion of "static" consumer groups. The idea is to be able to identify the same group instance across restarts so that a rebalance is not needed. However, if the user sets `group.instance.id` in the consumer configuration, but uses "simple" assignment with `assign()`, then the instance ID nevertheless is sent in the OffsetCommit request to the coordinator. This may result in a surprising UNKNOWN_MEMBER_ID error.
This PR attempts to fix this issue for existing consumers by relaxing the validation in this case. One way is to simply ignore the member id and the static id when the generation id is -1. -1 signals that the request comes from either the admin client or a consumer which does not use the group management. This does not apply to transactional offsets commit.
Reviewers: Jason Gustafson <jason@confluent.io>
This adds a new configuration `sasl.server.max.receive.size` that sets the maximum receive size for requests before and during authentication.
Reviewers: Tom Bentley <tbentley@redhat.com>, Mickael Maison <mickael.maison@gmail.com>
Co-authored-by: Manikumar Reddy <manikumar.reddy@gmail.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
When deserializing KRPC (which is used for RPCs sent to Kafka, Kafka Metadata records, and some
other things), check that we have at least N bytes remaining before allocating an array of size N.
Remove DataInputStreamReadable since it was hard to make this class aware of how many bytes were
remaining. Instead, when reading an individual record in the Raft layer, simply create a
ByteBufferAccessor with a ByteBuffer containing just the bytes we're interested in.
Add SimpleArraysMessageTest and ByteBufferAccessorTest. Also add some additional tests in
RequestResponseTest.
Reviewers: Tom Bentley <tbentley@redhat.com>, Mickael Maison <mickael.maison@gmail.com>, Colin McCabe <colin@cmccabe.xyz>
Co-authored-by: Colin McCabe <colin@cmccabe.xyz>
Co-authored-by: Manikumar Reddy <manikumar.reddy@gmail.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
kafka-features.sh must exit with a non-zero error code on error. We must do this in order to catch
regressions like KAFKA-13990.
Reviewers: David Arthur <mumrah@gmail.com>
Fix a bug in ReplicationControlManager where we got a NullPointerException when removing a topic
with no offline replicas, and there were other topics that did have offline replicas.
Fix an issue in MetadataDelta#replay where we were replaying RemoveTopicRecord twice.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>
Previously, the KRaft controller was incorrectly reporting an empty feature set in
ApiVersionResponse. This was preventing any multi-node clusters from being upgraded via
kafka-features.sh, since they would incorrectly believe that metadata.version was not a supported
feature. This PR adds a regression test for this bug, KRaftClusterTest.testUpdateMetadataVersion.
Reviewers: José Armando García Sancio <jsancio@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
This test was removed in #11667 since UpdateFeatures is not properly handled in KRaft mode, now we can bring it back since UpdateFeatures is properly handled after #12036.
Reviewers: Luke Chen <showuon@gmail.com>
This PR adds support to kafka-features.sh for the --metadata flag, as specified in KIP-778. This
flag makes it possible to upgrade to a new metadata version without consulting a table mapping
version names to short integers. Change --feature to use a key=value format.
FeatureCommandTest.scala: make most tests here true unit tests (that don't start brokers) in order
to improve test run time, and allow us to test more cases. For the integration test part, test both
KRaft and ZK-based clusters. Add support for mocking feature operations in MockAdminClient.java.
upgrade.html: add a section describing how the metadata.version should be upgraded in KRaft
clusters.
Add kraft_upgrade_test.py to test upgrades between KRaft versions.
Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@gmail.com>