This patch implements the handling of `includeAuthorizedOperations` flag in the ConsumerGroupDescribe API.
Reviewers: David Jacot <djacot@confluent.io>
Tests in BrokerLifecycleManagerTest do not close BrokerLifecycleManager
if an assertion fails.
This change makes BrokerLifecycleManager an instance variable that is
closed in an `@AfterEach` method.
Reviewers: Igor Soarez <i@soarez.me>
KAFKA-16606 (#15834) introduced a change that broke
ReassignPartitionsCommandTest.testReassignmentCompletionDuringPartialUpgrade.
The point was to validate that the MetadataVersion supports JBOD
in KRaft when multiple log directories are configured.
We do that by checking the version used in
kafka-features.sh upgrade --metadata, and the version discovered
via a FeatureRecord for metadata.version in the cluster metadata.
There's no point in checking inter.broker.protocol.version in
KafkaConfig, since in KRaft, that configuration is deprecated
and ignored — always assuming the value of MINIMUM_KRAFT_VERSION.
The broken that was broken sets inter.broker.protocol.version in
KRaft mode and configures 3 directories. So alternatively, we
could change the test to not configure this property.
Since the property isn't forbidden in KRaft mode, just ignored,
and operators may forget to remove it, it seems better to remote
the fail condition in KafkaConfig.
Reviewers: Luke Chen <showuon@gmail.com>
Support for multiple log directories in KRaft exists from
MetataVersion 3.7-IV2.
When migrating a ZK broker to KRaft, we already check that
the IBP is high enough before allowing the broker to startup.
With KIP-584 and KIP-778, Brokers in KRaft mode do not require
the IBP configuration - the configuration is deprecated.
In KRaft mode inter.broker.protocol.version defaults to
MetadataVersion.MINIMUM_KRAFT_VERSION (IBP_3_0_IV1).
Instead KRaft brokers discover the MetadataVersion by reading
the "metadata.version" FeatureLevelRecord from the cluster metadata.
This change adds a new configuration validation step upon discovering
the "metadata.version" from the cluster metadata.
Reviewers: Mickael Maison <mickael.maison@gmail.com>
Improve consistency and correctness for user-provided timeouts at the Consumer network request layer, per the Java client Consumer timeouts design (https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts). While the changes introduced in KAFKA-15974 enforce timeouts at the Consumer's event layer, this change enforces timeouts at the network request layer.
The changes mostly fit into the following areas:
1. Create shared code and idioms so timeout handling logic is consistent across current and future RequestManager implementations
2. Use deadlineMs instead of expirationMs, expirationTimeoutMs, retryExpirationTimeMs, timeoutMs, etc.
3. Update "preemptive pruning" to remove expired requests that have had at least one attempt
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Bruno Cadonna <cadonna@apache.org>
This patch moves the `PartitionAssignor` interface and all the related classes to a newly created `group-coordinator/api` module, following the pattern used by the storage and tools modules.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
A patch for KAFKA-15046 got rid of fsync on LeaderEpochFileCache#truncateFromStart/End for performance reason, but it turned out this could cause corrupted leader-epoch checkpoint file on ungraceful OS shutdown, i.e. OS shuts down in the middle when kernel is writing dirty pages back to the device.
To address this problem, this PR makes below changes: (1) Revert LeaderEpochCheckpoint#write to always fsync
(2) truncateFromStart/End now call LeaderEpochCheckpoint#write asynchronously on scheduler thread
(3) UnifiedLog#maybeCreateLeaderEpochCache now loads epoch entries from checkpoint file only when current cache is absent
Reviewers: Jun Rao <junrao@gmail.com>
During online migration, there could be ConsumerGroup that has members that uses the classic protocol. In the current implementation, `STALE_MEMBER_EPOCH` could be thrown in ConsumerGroup offset fetch/commit validation but it's not supported by the classic protocol. Thus this patch changed `ConsumerGroup#validateOffsetCommit` and `ConsumerGroup#validateOffsetFetch` to ensure compatibility.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
The PR adds skeleton code for Share Fetch and Acknowledge RPCs. The changes include:
1. Defining RPCs in KafkaApis.scala
2. Added new SharePartitionManager class which handles the RPCs handling
3. Added SharePartition class which manages in-memory record states and for fetched data.
Reviewers: David Jacot <djacot@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
The method createPutCacheCallback has a input argument preAppendErrors. It is used to keep the "error" happens before appending. However, it is always empty. Also, the pre-append error is handled before createPutCacheCallback by calling responseCallback. Hence, we can remove preAppendErrors.
Signed-off-by: PoAn Yang <payang@apache.org>
Reviewers: Luke Chen <showuon@gmail.com>
When starting up kafka logManager, we'll check stray replicas to avoid some corner cases. But this check might cause broker unable to startup if partition.metadata is missing because when startup kafka, we load log from file, and the topicId of the log is coming from partition.metadata file. So, if partition.metadata is missing, the topicId will be None, and the LogManager#isStrayKraftReplica will fail with no topicID error.
The partition.metadata missing could be some storage failure, or another possible path is unclean shutdown after topic is created in the replica, but before data is flushed into partition.metadata file. This is possible because we do the flush in async way here.
When finding a log without topicID, we should treat it as a stray log and then delete it.
Reviewers: Luke Chen <showuon@gmail.com>, Gaurav Narula <gaurav_narula2@apple.com>
Implement the add voter, remove voter, and update voter RPCs for
KIP-853. This is just adding the RPC handling; the current
implementation in RaftManager just throws UnsupportedVersionException.
Reviewers: Andrew Schofield <aschofield@confluent.io>, José Armando García Sancio <jsancio@apache.org>
When PartitionRegistration#merge() reads a PartitionChangeRecord
from an older MetadataVersion, with a replica assignment change
and without #directories() set, it produces a direcotry assignment
of DirectoryId.UNASSIGNED. This is problematic because the MetadataVersion
may not yet support directory assignments, leading to a
UnwritableMetadataException in PartitionRegistration#toRecord.
Since the Controller always sets directories on PartitionChangeRecord
if the MetadataVersion supports it, via PartitionChangeBuilder,
there's no need for PartitionRegistration#merge() to populate
directories upon a replica assignment change.
Reviewers: Luke Chen <showuon@gmail.com>
This patch fixes a typo in MetadataVersion.IBP_4_0_IV0. It should be 0 not O.
Reviewers: Justine Olshan <jolshan@confluent.io>, Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Allow KRaft replicas to send requests to any node (Node) not just the nodes configured in the
controller.quorum.voters property. This flexibility is needed so KRaft can implement the
controller.quorum.voters configuration, send request to the dynamically changing set of voters and
send request to the leader endpoint (Node) discovered through the KRaft RPCs (specially
BeginQuorumEpoch request and Fetch response).
This was achieved by changing the RequestManager API to accept Node instead of just the replica ID.
Internally, the request manager tracks connection state using the Node.idString method to match the
connection management used by NetworkClient.
The API for RequestManager is also changed so that the ConnectState class is not exposed in the
API. This allows the request manager to reclaim heap memory for any connection that is ready.
The NetworkChannel was updated to receive the endpoint information (Node) through the outbound raft
request (RaftRequent.Outbound). This makes the network channel more flexible as it doesn't need to
be configured with the list of all possible endpoints. RaftRequest.Outbound and
RaftResponse.Inbound were updated to include the remote node instead of just the remote id.
The follower state tracked by KRaft replicas was updated to include both the leader id and the
leader's endpoint (Node). In this comment the node value is computed from the set of voters. In
future commit this will be updated so that it is sent through KRaft RPCs. For example
BeginQuorumEpoch request and Fetch response.
Support for configuring controller.quorum.bootstrap.servers was added. This includes changes to
KafkaConfig, QuorumConfig, etc. All of the tests using QuorumTestHarness were changed to use the
controller.quorum.bootstrap.servers instead of the controller.quorum.voters for the broker
configuration. Finally, the node id for the bootstrap server will be decreasing negative numbers
starting with -2.
Reviewers: Jason Gustafson <jason@confluent.io>, Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
This PR defines the initial set of RPCs for KIP-932. The RPCs for the admin client and state management are not in this PR.
Reviewers: Apoorv Mittal <amittal@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
When upgrading from a MetadataVersion older than 3.7-IV2, we need to resend the broker registration, so that the controller can record the storage directories. The current code for doing this has several problems, however. One is that it tends to trigger even in cases where we don't actually need it. Another is that when re-registering the broker, the broker is marked as fenced.
This PR moves the handling of the re-registration case out of BrokerMetadataPublisher and into BrokerRegistrationTracker. The re-registration code there will only trigger in the case where the broker sees an existing registration for itself with no directories set. This is much more targetted than the original code.
Additionally, in ClusterControlManager, when re-registering the same broker, we now preserve its fencing and shutdown state, rather than clearing those. (There isn't any good reason re-registering the same broker should clear these things... this was purely an oversight.) Note that we can tell the broker is "the same" because it has the same IncarnationId.
Reviewers: Gaurav Narula <gaurav_narula2@apple.com>, Igor Soarez <soarez@apple.com>
This patch introduces the `group.version` feature flag with one version:
1) Version 1 enables the new consumer group rebalance protocol (KIP-848).
Reviewers: Justine Olshan <jolshan@confluent.io>
add more unit tests to LogSegments and do some small refactor in LogSegments.java
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
As per KIP-1022, we will rename the unstable metadata versions enabled config to support all feature versions.
Features is also updated to return latest production and latest testing versions of each feature.
A feature is production ready when the corresponding metadata version (bootstrapMetadataVersion) is production ready.
Adds tests for the feature usage of the unstableFeatureVersionsEnabled config
Reviewers: David Jacot <djacot@confluent.io>, Jun Rao <junrao@gmail.com>
While working on https://github.com/apache/kafka/pull/16120, I noticed that the transaction verification feature is disabled in `TransactionsTest` when the new group coordinator is enabled. We did this initially because the feature was not available in the new group coordinator but we fixed it a long time ago. We can enable it now.
Reviewers: Justine Olshan <jolshan@confluent.io>
This patch exposes the group coordinator config `CONSUMER_GROUP_MIGRATION_POLICY_CONFIG`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
This PR does following things
System tests should bring up Kafka broker in the native mode
System tests should run on Kafka broker in native mode
Extract out native build command so that it can be reused.
Allow system tests to run on Native Kafka broker using Docker mechanism
To run system tests by bringing up Kafka in native mode:
Pass kafka_mode as native in the ducktape globals:--globals '{\"kafka_mode\":\"native\"}'
Running system tests by bringing up kafka in native mode via docker mechanism
_DUCKTAPE_OPTIONS="--globals '{\"kafka_mode\":\"native\"}'" TC_PATHS="tests/kafkatest/tests/" bash tests/docker/run_tests.sh
To only bring up ducker nodes to cater native kafka
bash tests/docker/ducker-ak up -m native
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Added the implementation of the quota manager that will be used to throttle copy and fetch requests from the remote storage. Reference KIP-956
Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash <kchandraprakash@uber.com>, Jun Rao <junrao@gmail.com>
This patch disallows enabling the new consumer rebalance protocol in ZK mode.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Justine Olshan <jolshan@confluent.io>
n BrokerServer.scala, brokerMetadataPublishers are configured and when there are metadata updates remoteLogManager is not configured by then.
Ex : remoteLogManager.foreach(rlm => rlm.onLeadershipChange(partitionsBecomeLeader.asJava, partitionsBecomeFollower.asJava, topicIds)) in ReplicaManager is invoked after publishers are instantiated, and here rlm has relevant managers configured.
This change makes sure rlm is configured before the brokerMetadataPublishers initialization.
Reviewers: Luke Chen <showuon@gmail.com>, Nikhil Ramakrishnan <nikrmk@amazon.com>
As part of KIP-1022, I have created an interface for all the new features to be used when parsing the command line arguments, doing validations, getting default versions, etc.
I've also added the --feature flag to the storage tool to show how it will be used.
Created a TestFeatureVersion to show an implementation of the interface (besides MetadataVersion which is unique) and added tests using this new test feature.
I will add the unstable config and tests in a followup.
Reviewers: David Mao <dmao@confluent.io>, David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jun Rao <junrao@apache.org>
KIP-932 introduces share groups to go alongside consumer groups. Both kinds of group use server-side assignors but it is unlikely that a single assignor class would be suitable for both. As a result, the KIP introduces specific interfaces for consumer group and share group partition assignors.
This PR introduces only the consumer group interface, `o.a.k.coordinator.group.assignor.ConsumerGroupPartitionAssignor`. The share group interface will come in a later release. The existing implementations of the general `PartitionAssignor` interface have been changed to implement `ConsumerGroupPartitionAssignor` instead and all other code changes are just propagating the change throughout the codebase.
Note that the code in the group coordinator that actually calculates assignments uses the general `PartitionAssignor` interface so that it can be used with both kinds of group, even though the assignors themselves are specific.
Reviewers: Apoorv Mittal <amittal@confluent.io>, David Jacot <djacot@confluent.io>
After JBOD is supported in KRaft, we should also enable JBOD support in tiered storage. Unit tests and Integration tests are also added.
Reviewers: Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Igor Soarez <soarez@apple.com>, Mickael Maison <mickael.maison@gmail.com>
When moving from KafkaConfig.ReplicaFetchMaxBytesProp we used ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG instead of ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG. This PR patches the same.
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
When doing alter replica logDirs, we'll create a future log and pause log cleaning for the partition( here). And this log cleaning pausing will resume after alter replica logDirs completes (here). And when in the resuming log cleaning, we'll decrement 1 for the LogCleaningPaused count. Once the count reached 0, the cleaning pause is really resuming. (here). For more explanation about the logCleaningPaused state can check here.
But, there's still one factor that could increase the LogCleaningPaused count: leadership change (here). When there's a leadership change, we'll check if there's a future log in this partition, if so, we'll create future log and pauseCleaning (LogCleaningPaused count + 1). So, if during the alter replica logDirs:
1. alter replica logDirs for tp0 triggered (LogCleaningPaused count = 1)
2. tp0 leadership changed (LogCleaningPaused count = 2)
3. alter replica logDirs completes, resuming logCleaning (LogCleaningPaused count = 1)
4. LogCleaning keeps paused because the count is always > 0
This PR fixes this issue by only abortAndPauseCleaning when future log is not existed. We did the same check in alterReplicaLogDirs. So this change can make sure there's only 1 abortAndPauseCleaning for either abortAndPauseCleaning or maybeAddLogDirFetchers. Tests also added.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Igor Soarez <soarez@apple.com>
This patch was initially created in #15536.
When there is a commit for multiple topic partitions and some, but not all, exceed the offset metadata limit, the pending commit is not properly cleaned up leading to UNSTABLE_OFFSET_COMMIT errors when trying to fetch the offsets with read_committed. This change makes it so the invalid commits are not added to the pendingOffsetCommits set.
Co-authored-by: Kyle Phelps <kyle.phelps@datadoghq.com>
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Justine Olshan <jolshan@confluent.io>
Don't throw OFFSET_OUT_OF_RANGE error when converting the offset to metadata, and next time the leader should increment the high watermark by itself after receiving fetch requests from followers. This can happen when checkpoint files are missing and being elected as a leader.
Reviewers: Luke Chen <showuon@gmail.com>, Jun Rao <junrao@apache.org>
ZkMetadataCache could theoretically return KRaft controller information from a call to
ZkMetadataCache.getAliveBrokerNode, which doesn't make sense. KRaft controllers are not part of the
set of brokers. The only use-case for this functionality was in MetadataCacheControllerNodeProvider
during ZK migration, where it allowed ZK brokers in migration mode to forward requests to
kcontrollers when appropriate. This PR changes MetadataCacheControllerNodeProvider to simply
delegate to quorumControllerNodeProvider in this case.
Reviewers: José Armando García Sancio <jsancio@apache.org>
Fix the code in the RaftControllerNodeProvider to query RaftManager to find Node information,
rather than consulting a static map. Add a RaftManager.voterNode function to supply this
information. In KRaftClusterTest, add testControllerFailover to get more coverage of controller
failovers.
Reviewers: José Armando García Sancio <jsancio@apache.org>
A broker that is unable to communicate with the controller will shut down
after the configurable log.dir.failure.timeout.ms.
The implementation adds a new event to the Kafka EventQueue. This event
is deferred by the configured timeout and will execute the shutdown
if the heartbeat communication containing the failed log dir is still
pending with the controller.
Reviewers: Igor Soarez <soarez@apple.com>
This PR aims to add Docker Image for GraalVM based Native Kafka Broker as per the following KIP - https://cwiki.apache.org/confluence/display/KAFKA/KIP-974%3A+Docker+Image+for+GraalVM+based+Native+Kafka+Broker
This PR adds the following functionalities:
Ability to build the docker image for Native Apache Kafka
- Dockerfile
- Launch script
- metadata configs required by graalVM native-image: link
Add Kafka startup ability in the KafkaDockerWrapper.scala
Ability to build and test the image - integrated with the existing JVM docker image framework.
In #15837, we introduced the change to allow calling the WriteTxnMarkers API with AlterCluster permissions. This PR proposes 2 enhancements:
- When a WriteTxnMarkers request is received, it is first authorized against the alter cluster permission. If the user does not have this permission, a 'deny' will be logged. However, if the user does have the cluster action permission, the request will be successfully authorized. Don't log the first deny to avoid confusion.
- Add a `WriteTxnMarkersRequest` to be called from the test `testAuthorizationWithTopicExisting`, so that the request can be exercised and verified with both possible permissions.
Author: Nikhil Ramakrishnan <nikrmk@amazon.com>
Reviewers: Christo Lolov <lolovc@amazon.com>
Closes#15952 from nikramakrishnan/kip1037-addTest
Move existing rebalance callback + consumer.position test to the PlaintextConsumerCallbackTest file (refactored to reuse the new helper funcs available)
Add new integration tests for callbacks interaction with seek and pause
Minor cleanup in the callbacks test file
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
We have discovered during large scale performance tests that the current write path of the new coordinator does not scale well. The issue is that each write operation writes synchronously from the coordinator threads. Coalescing records into bigger batches helps drastically because it amortizes the cost of writes. Aligning the batches with the snapshots of the timelines data structures also reduces the number of in-flight snapshots.
This patch is the first of a series of patches that will bring records coalescing into the coordinator runtime. As a first step, we had to rework the PartitionWriter interface and move the logic to build MemoryRecords from it to the CoordinatorRuntime. The main changes are in these two classes. The others are related mechanical changes.
Reviewers: Justine Olshan <jolshan@confluent.io>
An exception thrown while closing the client instances in `IntegrationTestHarness::tearDown` may result in `KafkaServerTestHarness::tearDown` not being invoked. This would result in thread leaks of the broker and controller threads spawned in the failing test.
An example of this is the [CI run](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15994/1/tests) for #15994 where `Build / JDK 8 and Scala 2.12 / testCoordinatorFailover(String, String).quorum=kraft+kip848.groupProtocol=consumer – kafka.api.PlaintextConsumerTest` failing results in `consumers.foreach(_.close(Duration.ZERO))` in `IntegrationTestHarness::tearDown` throwing an exception.
A side effect of this is it poisons Gradle test runner JVM and prevents tests in other unrelated classes from executing as `@BeforeAll` check in QuorumTestHarness would cause them to fail immediately.
This PR encloses the client closure in try-finally to ensure`KafkaServerTestHarness::tearDown` is always invoked.
Reviewers: Nikhil Ramakrishnan <nikrmk@amazon.com>, Igor Soarez <soarez@apple.com>, Chia-Ping Tsai <chia7712@gmail.com>
We weren't enabling discoverBrokerVersions to check the supported versions in the AddPartitionsToTxnManager. This means that any verification request (or any AddPartitionsToTxnRequest version) from a newer broker would fail when sending to an older broker.
The bulk of this change is adding additional transactions system tests for old versions.
One test upgrades the cluster completely. This didn't catch the issue but could be useful.
The other test forces a new broker to send a verification request to an older one. Without the discoverBrokerVersions change, all tests between mixed brokers failed. (We introduced a new request version in 3.8 -- which is a separate version from the one that caused the bug for 3.5 -> 3.6) With the addition, the tests all passed.
I also manually ran a test for 3.5 -> 3.6 since the issue there was slightly different and was caused by the unstableLatestVersion flag being enabled. This change should fix this as well. 👍
Reviewers: David Jacot <djacot@confluent.io>
Allow KRaft replicas to read and write version 0 and 1 of the quorum-state file. Which version is written is controlled by the kraft.version. With kraft.version 0, version 0 of the quorum-state file is written. With kraft.version 1, version 1 of the quorum-state file is written. Version 1 of the quorum-state file adds the VotedDirectoryId field and removes the CurrentVoters. The other fields removed in version 1 are not important as they were not overwritten or used by KRaft.
In kraft.version 1 the set of voters will be stored in the kraft partition log segments and snapshots.
To implement this feature the following changes were made to KRaft.
FileBasedStateStore was renamed to FileQuorumStateStore to better match the name of the implemented interface QuorumStateStore.
The QuorumStateStore::writeElectionState was extended to include the kraft.version. This version is used to determine which version of QuorumStateData to store. When writing version 0 the VotedDirectoryId is not persisted but the latest value is kept in-memory. This allows the replica to vote consistently while they stay online. If a replica restarts in the middle of an election it will forget the VotedDirectoryId if the kraft.version is 0. This should be rare in practice and should only happen if there is an election and failure while the system is upgrading to kraft.version 1.
The type ElectionState, the interface EpochState and all of the implementations of EpochState (VotedState, UnattachedState, FollowerState, ResignedState, CandidateState and LeaderState) are extended to support the new voted directory id.
The type QuorumState is changed so that local directory id is used. The type is also changed so that the latest value for the set of voters and the kraft version is query from the KRaftControlRecordStateMachine.
The replica directory id is read from the meta.properties and passed to the KafkaRaftClient. The replica directory id is guaranteed to be set in the local replica.
Adds a new metric for current-vote-directory-id which exposes the latest in-memory value of the voted directory id.
Renames VoterSet.VoterKey to ReplicaKey.
It is important to note that after this change, version 1 of the quorum-state file will not be written by kraft controllers and brokers. This change adds support reading and writing version 1 of the file in preparation for future changes.
Reviewers: Jun Rao <junrao@apache.org>
We need to add an example test to the PlaintextConsumerTest that tests a common
ConsumerRebalanceListener use case. For example, create an integration test that
invokes the Consumer API to commit offsets in the onPartitionsRevoked callback.
This test is implemented in a reasonably general way with a view to using it as a
template from which other tests can be created later. Eventually we will need to
have a comprehensive set of tests that cover all the basic use cases.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Bruno Cadonna <cadonna@apache.org>
This patch deprecates `offsets.commit.required.acks` in Apache Kafka 3.8 as described in KIP-1041: https://cwiki.apache.org/confluence/x/9YobEg.
Reviewers: Justine Olshan <jolshan@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
KIP-227 introduced in-memory caching of FetchSessions. Brokers with a large number of Fetch requests suffer from contention on trying to acquire a lock on FetchSessionCache.
This change aims to reduce lock contention for FetchSessionCache by sharding the cache into multiple segments, each responsible for an equal range of sessionIds. Assuming Fetch requests have a uniform distribution of sessionIds, the probability of contention on a segment is reduced by a factor of the number of segments.
We ensure backwards compatibility by ensuring total number of cache entries remain the same as configured and sessionIds are randomly allocated.
Reviewers: Igor Soarez <soarez@apple.com>, Chia-Ping Tsai <chia7712@gmail.com>
Introduce a new field id in annotation ClusterConfigProperty. The main purpose of new field is to define specific broker/controller(kraft) property. And the default value is -1 which means the ClusterConfigProperty will apply to all broker/controller.
Note that under Type.KRAFT mode, the controller id starts from 3000, and then increments by one each time. Other modes the broker/controller id starts from 0 and then increments by one.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
As a part of 2e8d69b78c, we had introduced the TransactionAbortableException in AK. On more detailed analysis we figured out that the enum SupportedOperation was a bit misleading. Hence updated the same to TransactionSupportedOperation to allow a better and more defined function signature
Reviewers: Justine Olshan <jolshan@confluent.io>
When there are overlapping segments in the remote storage, then the deletion may fail to remove the segments due to isRemoteSegmentWithinLeaderEpochs check. Once the deletion starts to fail for a partition, then segments won't be eligible for cleanup. The one workaround that we have is to move the log-start-offset using the kafka-delete-records script.
Reviewers: Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>
Fix to allow to initialize positions for newly assigned partitions, while the onPartitionsAssigned callback is running, even though the partitions remain non-fetchable until the callback completes.
Before this PR, we were not allowing initialization or fetching while the callback was running. The fix here only allows to initialize the newly assigned partition position, and keeps the existing logic for making sure that the partition remains non-fetchable until the callback completes.
The need for this fix came out in one of the connect system tests, that attempts to retrieve a newly assigned partition position with a call to consumer.position from within the onPartitionsAssigned callback (WorkerSinkTask). With this PR, we allow to make such calls (test added), which is the behaviour of the legacy consumer.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
The patch implements JoinGroup API for the new consumer groups. It allow members using the classic rebalance protocol with the consumer embedded protocol to join a new consumer group.
Reviewers: David Jacot <djacot@confluent.io>
When the consumer group protocol is used in a cluster, it is, at the moment, impossible to see all records stored in the __consumer_offsets topic with kafka-dump-log --offsets-decoder. It does not know how to handle all the new records.
This patch refactors the OffsetsMessageParser used internally by kafka-dump-log to use the RecordSerde used by the new group coordinator. It ensures that the tool is always in sync with the coordinator implementation. The patch also changes the format to using the toString'ed representations of the records instead of having custom logic to dump them. It ensures that all the information is always dumped. The downside of the latest is that inner byte arrays (e.g. assignment in the classic protocol) are no longer deserialized. Personally, I feel like that it is acceptable and it is actually better to stay as close as possible to the actual records in this tool. It also avoids issues like https://issues.apache.org/jira/browse/KAFKA-15603.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This patch fixes two issues with IncrementalAlterConfigs and the ZK migration. First, it changes the handling of IncrementalAlterConfigs to check if the controller is ZK vs KRaft and only forward for KRaft. Second, it adds a check in KafkaZkClient#setOrCreateEntityConfigs to ensure a ZK broker is not directly modifying configs in ZK if there is a KRaft controller. This closes the race condition between KRaft taking over as the active controller and the ZK brokers learning about this.
*Forwarding*
During the ZK migration, there is a time when the ZK brokers are running with migrations enabled, but KRaft has yet to take over as the controller. Prior to KRaft taking over as the controller, the ZK brokers in migration mode were unconditionally forwarding IncrementalAlterConfigs (IAC) to the ZK controller. This works for some config types, but breaks when setting BROKER and BROKER_LOGGER configs for a specific broker. The behavior in KafkaApis for IAC was to always forward if the forwarding manager was defined. Since ZK brokers in migration mode have forwarding enabled, the forwarding would happen, and the special logic for BROKER and BROKER_LOGGER would be missed, causing the request to fail.
With this fix, the IAC handler will check if the controller is KRaft or ZK and only forward for KRaft.
*Protected ZK Writes*
As part of KIP-500, we moved most (but not all) ZK mutations to the ZK controller. One of the things we did not move fully to the controller was entity configs. This is because there was some special logic that needed to run on the broker for certain config updates. If a broker-specific config was set, AdminClient would route the request to the proper broker. In KRaft, we have a different mechanism for handling broker-specific config updates.
Leaving this ZK update on the broker side would be okay if we were guarding writes on the controller epoch, but it turns out KafkaZkClient#setOrCreateEntityConfigs does unprotected "last writer wins" updates to ZK. This means a ZK broker could update the contents of ZK after the metadata had been migrated to KRaft. No good! To fix this, this patch adds a check on the controller epoch to KafkaZkClient#setOrCreateEntityConfigs but also adds logic to fail the update if the controller is a KRaft controller.
The new logic in setOrCreateEntityConfigs adds STALE_CONTROLLER_EPOCH as a new exception that can be thrown while updating configs.
Reviewers: Luke Chen <showuon@gmail.com>, Akhilesh Chaganti <akhileshchg@users.noreply.github.com>, Chia-Ping Tsai <chia7712@gmail.com>
Adds support for the KafkaRaftClient to read the control records KRaftVersionRecord and VotersRecord in the snapshot and log. As the control records in the KRaft partition are read, the replica's known set of voters are updated. This change also contains the necessary changes to include the control records when a snapshot is generated by the KRaft state machine.
It is important to note that this commit changes the code and the in-memory state to track the sets of voters but it doesn't change any data that is externally exposed. It doesn't change the RPCs, data stored on disk or configuration.
When the KRaft replica starts the PartitionListener reads the latest snapshot and then log segments up to the LEO, updating the in-memory state as it reads KRaftVersionRecord and VotersRecord. When the replica (leader and follower) appends to the log, the PartitionListener catches up to the new LEO. When the replica truncates the log because of a diverging epoch, the PartitionListener also truncates the in-memory state to the new LEO. When the state machine generate a new snapshot the PartitionListener trims any prefix entries that are not needed. This is all done to minimize the amount of data tracked in-memory and to make sure that it matches the state on disk.
To implement the functionality described above this commit also makes the following changes:
Adds control records for KRaftVersionRecord and VotersRecord. KRaftVersionRecord describes the finalized kraft.version supported by all of the replicas. VotersRecords describes the set of voters at a specific offset.
Changes Kafka's feature version to support 0 as the smallest valid value. This is needed because the default value for kraft.version is 0.
Refactors FileRawSnapshotWriter so that it doesn't directly call the onSnapshotFrozen callback. It adds NotifyingRawSnapshotWriter for calling such callbacks. This reorganization is needed because in this change both the KafkaMetadataLog and the KafkaRaftClient need to react to snapshots getting frozen.
Cleans up KafkaRaftClient's initialization. Removes initialize from RaftClient - this is an implementation detail that doesn't need to be exposed in the interface. Removes RaftConfig.AddressSpec and simplifies the bootstrapping of the static voter's address. The bootstrapping of the address is delayed because of tests. We should be able to simplify this further in future commits.
Update the DumpLogSegment CLI to support the new control records KRaftVersionRecord and VotersRecord.
Fix the RecordsSnapshotReader implementations so that the iterator includes control records. RecordsIterator is extended to support reading the new control records.
Improve the BatchAccumulator implementation to allow multiple control records in one control batch. This is needed so that KRaft can make sure that VotersRecord is included in the same batch as the control record (KRaftVersionRecord) that upgrades the kraft.version to 1.
Add a History interface and default implementation TreeMapHistory. This is used to track all of the sets of voters between the latest snapshot and the LEO. This is needed so that KafkaRaftClient can query for the latest set of voters and so that KafkaRaftClient can include the correct set of voters when the state machine generates a new snapshot at a given offset.
Add a builder pattern for RecordsSnapshotWriter. The new builder pattern also implements including the KRaftVersionRecord and VotersRecord control records in the snapshot as necessary. A KRaftVersionRecord should be appended if the kraft.version is greater than 0 at the snapshot's offset. Similarly, a VotersRecord should be appended to the snapshot with the latest value up to the snapshot's offset.
Reviewers: Jason Gustafson <jason@confluent.io>
The AsyncKafkaConsumer implementation of position(TopicPartition, Duration) was not updating its internal Timer, causing it to execute the loop forever. Adding a call to update the Timer at the bottom of the loop fixes the issue.
An integration test was added to catch this case; it fails without the newly added call to Timer.update(long).
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Instead of entering pending forever, this PR invoke next schedule after 1ms. However, the side effect is busy-waiting. Hence, This PR also update the docs to remind users about that - the issue about smaller log.segment.delete.delay.ms
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This PR do the following cleanup for TestUtils.scala
1) remove unused methods
2) move methods used by single test class out of
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
ControllerRegistrationManagerTest is flaky due to the poll in L221. The potential root cause is a race condition between the first poll (L221) and the second poll (L229). Before the second poll, we mock a response (L226), which should be processed by the second poll. However, if the first poll take this away, the second poll would get nothing, and this could lead to an error.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
* Make ClusterConfig immutable
* Make BrokerNode immutable
* Refactor out build argument in ControllerNode
* Add setPrefix and replace put property with set map in ClusterConfig
* Remove rollingBrokerRestart from ClusterInstance interface
* Refactor KRaftClusterTest#doOnStartedKafkaCluster
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Even if the log start offset is updated, the log deletion might still not completed. Making the test more robust.
Reviewers: Luke Chen <showuon@gmail.com>
If there are some logs to be deleted during the log dir movement, we'll send for a scheduler to do the deletion later.
However, when the log dir movement completed, the future log is renamed, the async log deletion will fail with no file existed error.
Signed-off-by: PoAn Yang <payang@apache.org>
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, SoontaekLim <soontaek.lim@neya.kr>, Johnny Hsu <johnnyhsu@fb.com>
The javadoc for KafkaConsumer.commitSync says:
Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
(or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
This is not always true in the async consumer, where there is no code at all to make sure that the callback is executed before commitSync returns.
Similarly, the async consumer is also missing logic to await callback execution in close. While the javadoc doesn't explicitly promise callback execution, it promises "completing commits", which one would reasonably expect to include callback execution. Also, the legacy consumer contains some code to execute callbacks before closing.
This change proposed a number of fixes to clean up the callback execution guarantees in the async consumer:
We keep track of the incomplete async commit
futures and wait for them to complete before returning from
commitSync or close (if there is time).
Since we need to block to make sure that our previous commits are
completed, we allow the consumer to wake up.
Some similar gaps are addressed in the legacy consumer, see #15693
Testing
Two new integration tests and a couple of unit tests.
Reviewers: Bruno Cadonna <cadonna@apache.org>, Kirk True <ktrue@confluent.io>, Lianet Magrans <lianetmr@gmail.com>
It is observed that for scenario (3), i.e. a broker crashes while it
waits for the future replica to catch up for the second time and the
`dir1` is unavailable when the broker is restarted, the
broker tries to create the partition in `dir2` according to the metadata
in the controller. However, ReplicaManager also tries to resume the
stale future replica which was abandoned when the broker crashed. This
results in the renaming of the future replica to fail eventually because
the directory for the topic partition already exists in `dir2` and the
broker then marks `dir2` as offline.
This PR attempts to fix this behaviour by ignoring any future replicas
which are in the same directory as where the log exists. It further
marks the stale future replica for deletion.
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Igor Soarez <soarez@apple.com>, Proven Provenzano <pprovenzano@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>