Introduced a share fetch purgatory on the broker which delays share fetch requests that cannot be completed instantaneously. Introduced 2 new classes -
DelayedShareFetch - Contains logic to instantaneously complete or force complete a share fetch request on timeout.
DelayedShareFetchKey - Contains the key which can be used to watch the entries within the share fetch purgatory.
ShareFetchUtils - This utility class contains functionalities required for post-processing once the replica manager fetch is completed.
There are many scenarios which can cause a share fetch request to be delayed and multiple scenarios when a delayed share fetch can be attempted to be completed. In this PR, we are only targeting the case when record lock partition limit is reached, the ShareFetch should wait for up to MaxWaitMs for records to be released.
Reviewers: David Arthur <mumrah@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Jun Rao <junrao@gmail.com>
Introduces the share coordinator. This coordinator is built on the new coordinator runtime framework. It
is responsible for persistence of share-group state in a new internal topic named "__share_group_state".
The responsibility for being a share coordinator is distributed across the brokers in a cluster.
Reviewers: David Arthur <mumrah@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
The COPY_SEGMENT_STARTED state segments are counted when calculating remote retention size. This causes unexpected retention size breached segment deletion. This PR fixes it by
1. only counting COPY_SEGMENT_FINISHED and DELETE_SEGMENT_STARTED state segments when calculating remote log size.
2. During copy Segment, if we encounter errors, we will delete the segment immediately.
3. Tests added.
Co-authored-by: Guillaume Mallet <>
Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org>, Guillaume Mallet <>
The patch adds support of alter/describe configs for group in kafka-configs.sh.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
Introduce ShareCoordinator interface and related classes.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, David Arthur <mumrah@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Add the version check to client side when building ListOffsetRequest for the specific timestamp:
1) the version must be >=8 if timestamp=-4L (EARLIEST_LOCAL_TIMESTAMP)
2) the version must be >=9 if timestamp=-5L (LATEST_TIERED_TIMESTAMP)
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This patch introduces a wrapper around [HdrHistogram](https://github.com/HdrHistogram/HdrHistogram) to use for group coordinator histograms, event queue time, event processing time, flush time, and purgatory time.
Reviewers: David Jacot <djacot@confluent.io>
Establishes the new `:share` Gradle module. This module is intended to be used for server-side KIP-932 classes that are not part of the new share group coordinator.
This patch relocates and renames some existing classes. A small amount of compatibility changes were also made, but do not affect any logic.
Reviewers: Andrew Schofield <aschofield@confluent.io>, David Arthur <mumrah@gmail.com>
What
Introduced a new module share-coordinator to house relevant implementation code and resources.
Modified settings.gradle and build.gradle to accommodate the module.
Added ShareSnapshot[Key, Value], ShareUpdate[Key, Value] message record schemas.
Introduced a trivial impl of ShareCoordinatorShard class to establish dependencies with other modules (:coordinator-common, :metadata). The actual impl for this class will be done in future PRs.
Why
The share coordinator component has been introduced as part of KIP-932 (QFK). This component is will be responsible for managing persistence for various data related to share partitions into a dedicated internal topic.
To keep all this functionality contained, we want to create a separate module in line with group and transaction coordinators.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
There is a lot of code in group-coordinator which is not share/consumer/classic group specific.
Since we are introducing a share-coordinator as part of KIP-932 (in a new module), it would make sense to get the common coordinator functionality into a separate common coordinator module so that share-coordinator need not depend on group-coordinator.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, David Jacot <djacot@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
In MetadataVersion 3.7-IV2 and above, the broker's AssignmentsManager sends an RPC to the
controller informing it about which directory we have chosen to place each new replica on.
Unfortunately, the code does not check to see if the topic still exists in the MetadataImage before
sending the RPC. It will also retry infinitely. Therefore, after a topic is created and deleted in
rapid succession, we can get stuck including the now-defunct replica in our subsequent
AssignReplicasToDirsRequests forever.
In order to prevent this problem, the AssignmentsManager should check if a topic still exists (and
is still present on the broker in question) before sending the RPC. In order to prevent log spam,
we should not log any error messages until several minutes have gone past without success.
Finally, rather than creating a new EventQueue event for each assignment request, we should simply
modify a shared data structure and schedule a deferred event to send the accumulated RPCs. This
will improve efficiency.
Reviewers: Igor Soarez <i@soarez.me>, Ron Dagostino <rndgstn@gmail.com>
This PR adds support for add-controller and remove-controller in the kafka-metadata-quorum.sh
command-line tool. It also fixes some minor server-side bugs that blocked the tool from working.
In kafka-metadata-quorum.sh, the implementation of remove-controller is fairly straightforward. It
just takes some command-line flags and uses them to invoke AdminClient. The add-controller
implementation is a bit more complex because we have to look at the new controller's configuration
file. The parsing logic for the advertised.listeners and listeners server configurations that we
need was previously implemented in the :core module. However, the gradle module where
kafka-metadata-quorum.sh lives, :tools, cannot depend on :core. Therefore, I moved listener parsing
into SocketServerConfigs.listenerListToEndPoints. This will be a small step forward in our efforts
to move Kafka configuration out of :core.
I also made some minor changes in kafka-metadata-quorum.sh and Kafka-storage-tool.sh to handle
--help without displaying a backtrace on the screen, and give slightly better error messages on
stderr. Also, in DynamicVoter.toString, we now enclose the host in brackets if it contains a colon
(as IPV6 addresses can).
This PR fixes our handling of clusterId in addRaftVoter and removeRaftVoter, in two ways. Firstly,
it marks clusterId as nullable in the AddRaftVoterRequest.json and RemoveRaftVoterRequest.json
schemas, as it was always intended to be. Secondly, it allows AdminClient to optionally send
clusterId, by using AddRaftVoterOptions and RemoveRaftVoterOptions. We now also remember to
properly set timeoutMs in AddRaftVoterRequest. This PR adds unit tests for
KafkaAdminClient#addRaftVoter and KafkaAdminClient#removeRaftVoter, to make sure they are sending
the right things.
Finally, I fixed some minor server-side bugs that were blocking the handling of these RPCs.
Firstly, ApiKeys.ADD_RAFT_VOTER and ApiKeys.REMOVE_RAFT_VOTER are now marked as forwardable so that
forwarding from the broker to the active controller works correctly. Secondly,
org.apache.kafka.raft.KafkaNetworkChannel has now been updated to enable API_VERSIONS_REQUEST and
API_VERSIONS_RESPONSE.
Co-authored-by: Murali Basani muralidhar.basani@aiven.io
Reviewers: José Armando García Sancio <jsancio@apache.org>, Alyssa Huang <ahuang@confluent.io>
The purpose of this PR is to remove ConsumerTestBuilder.java since it is no longer needed. The following PRs have eliminated the use of ConsumerTestBuilder:
#14930#16140#16200#16312
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This pr support EarliestLocalSpec LatestTierSpec in GetOffsetShell, and add integration tests.
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang <payang@apache.org>
As part of KIP-853, storage-tool.sh now has two new flags: --standalone, and --initial-voters. This PR implements these two flags in storage-tool.sh.
There are currently two valid ways to format a cluster:
The pre-KIP-853 way, where you use a statically configured controller quorum. In this case, neither --standalone nor --initial-voters may be specified, and kraft.version must be set to 0.
The KIP-853 way, where one of --standalone and --initial-voters must be specified with the initial value of the dynamic controller quorum. In this case, kraft.version must be set to 1.
This PR moves the formatting logic out of StorageTool.scala and into Formatter.java. The tool file was never intended to get so huge, or to implement complex logic like generating metadata records. Those things should be done by code in the metadata or raft gradle modules. This is also useful for junit tests, which often need to do formatting. (The 'info' and 'random-uuid' commands remain in StorageTool.scala, for now.)
Reviewers: José Armando García Sancio <jsancio@apache.org>
This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR actually catches processing exceptions from punctuate.
Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: loicgreffier <loic.greffier@michelin.com>
Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
As part of KIP-950, we want to split the RemoteLogManagerScheduledThreadPool into separate thread pools (one for copy and another for expiration). In this change, we are splitting it into three thread pools (one for copy, one for expiration, and another one for follower). We are reusing the same thread pool configuration for all three thread pools. We can introduce new user-facing configurations later.
Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>, Satish Duggana <satishd@apache.org>
ShareGroupHeartbeat API support as defined in KIP-932. The heartbeat persists Group and Member information on __consumer_offsets topic.
The PR also moves some of the ShareGroupConfigs to GroupCoordinatorConfigs as they should only be used in group coordinator.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
Defined share group, member and sinmple assignor classes with API definition for Share Group Heartbeat and Describe API.
The ShareGroup and ShareGroupMember extends the common ModernGroup and ModernGroupMember respectively.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
Implemented the functionality which takes care of archiving the records when LSO moves past them. Implemented the following functions -
1. updateCacheAndOffsets - Updates the cached state, start and end offsets of the share partition as per the new log start offset. The method is called when the log start offset is moved for the share partition.
2. archiveAvailableRecordsOnLsoMovement - This function archives all the available records when they are behind the LSO.
3. archivePerOffsetBatchRecords - It archives all the available records in the per offset tracked batch passed to this function.
4. archiveCompleteBatch - It archives all the available records of the complete batch passed to this function.
Reviewers: Andrew Schofield <aschofield@confluent.io>,Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
About
This PR adds acknowledge code in SharePartitionManager. Internally, the record acknowledgements happen at the SharePartition level. SharePartitionManager identifies the SharePartitions and calls their acknowledge method to actually acknowledge the individual records
Testing
Added unit tests to cover the new functionality added in SharePartitionManagerTest
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>