This patch does two things:
1) Change the validation of the ConsumerGroupHeartbeat request to accept subscribed topic names and/or subscribed topic regex. At least of them must be set in the first request with epoch 0.
2) Validate the provided regular expression by compiling it.
Co-authored-by: Lianet Magrans <lmagrans@confluent.io>
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
Instead of waiting until Tasks are assigned to us, we pre-emptively
create a StandbyTask for each non-empty Task directory found on-disk.
We do this before starting any StreamThreads, and on our first
assignment (after joining the consumer group), we recycle any of these
StandbyTasks that were assigned to us, either as an Active or a
Standby.
We can't just use these "initial Standbys" as-is, because they were
constructed outside the context of a StreamThread, so we first have to
update them with the context (log context, ChangelogReader, and source
topics) of the thread that it has been assigned to.
The motivation for this is to (in a later commit) read StateStore
offsets for unowned Tasks from the StateStore itself, rather than the
.checkpoint file, which we plan to deprecate and remove.
There are a few additional benefits:
Initializing these Tasks on start-up, instead of on-assignment, will
reduce the time between a member joining the consumer group and beginning
processing. This is especially important when active tasks are being moved
over, for example, as part of a rolling restart.
If a Task has corrupt data on-disk, it will be discovered on startup and
wiped under EOS. This is preferable to wiping the state after being
assigned the Task, because another instance may have non-corrupt data and
would not need to restore (as much).
There is a potential performance impact: we open all on-disk Task
StateStores, and keep them all open until we have our first assignment.
This could require large amounts of memory, in particular when there are
a large number of local state stores on-disk.
However, since old local state for Tasks we don't own is automatically
cleaned up after a period of time, in practice, we will almost always
only be dealing with the state that was last assigned to the local
instance.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>, Matthias Sax <mjsax@apache.org>
Adds the DefaultStatePersister and other supporting classes for managing share state.
* Added DefaultStatePersister implementation. This is the entry point for callers who wish to invoke the share state RPC API.
* Added PersisterStateManager which is used by DefaultStatePersister to manage and send the RPCs over the network.
* Added code to BrokerServer and BrokerMetadataPublisher to instantiate the appropriate persister based on the config value for group.share.persister.class.name. If this is not specified, the DefaultStatePersister will be used. To force use of NoOpStatePersister, set the config to empty. This is an internal config, not to be exposed to the end user. This will be used to factory plug the appropriate persister.
* Using this persister, the internal __share_group_state topic will come to life and will be used for persistence of share group info.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>
This allows shutting down a KafkaClusterTestKit from a JVM shutdown hook without risking error logs because the base directory has already been deleted by the shutdown hook TestUtils.tempDirectory sets up.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
In Kafka 4.0, by default the consumer uses the updated consumer group protocol defined in KIP-848. When the consumer is used with a cluster that does not support (or is not configured to use) the new protocol, the consumer will get an unfriendly error about unavailable APIs. Since this error could be the user's first impression when attempting to upgrade to 4.0, we need to make sure that the error is very clear about the remediation steps (set the group.protocol to CLASSIC on the client or upgrade and enable the new protocol on the cluster).
Reviewers: Kirk True <ktrue@confluent.io>, David Jacot <djacot@confluent.io>
As part of KIP-932, a new internal topic __share_group_state was introduced. There are 2 types of records which are currently being added in this topic - ShareSnapshotKey/Value and ShareUpdateKey/Value
In light of this, we must make the existing tooling like kafka-console-consumer and kafka-dump-log aware of these records for debugging and introspection purposes.
This PR introduces ShareGroupStateMessageFormatter to be used used with kafka-console-consumer and adds an internal class ShareGroupStateMessageParser in DumpLogSegments.scala.
Unit tests have been added to DumpLogSegmentsTest.scala
Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
After MirrorMaker 1 removal, there are no other modules dependencies for these classes, so we can safely move them to tools module.
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Reviewers: Mickael Maison <mickael.maison@gmail.com>
Create a schema checker that can validate that later versions of a KRPC schema are compatible with earlier ones.
Reviewers: David Arthur <mumrah@gmail.com>
This patch completely removes the compile-time dependency on core for both test and main sources by introducing two new modules.
1) `test-common` include all the common test implementation code (including dependency on :core for BrokerServer, ControllerServer, etc)
2) `test-common:api` new sub-module that just includes interfaces including our junit extension
Reviewers: David Arthur <mumrah@gmail.com>
This patch introduces a merging algorithm for persistent state batches in the share coordinator.
The algorithm removes any expired batches (lastOffset before startOffset) and then places the rest in a sorted map. It then identifies batch pairs which overlap and combine them while preserving the relative priorities of any intersecting sub-ranges. The resultant batches are placed back into the map. The algorithm ends when no more overlapping pairs can be found.
Reviewers: Andrew Schofield <aschofield@confluent.io>, David Arthur <mumrah@gmail.com>, Apoorv Mittal <apoorvmittal10@gmail.com>, Jun Rao <junrao@gmail.com>
With EOSv1 removed, we don't need to create a producer per task, and thus can simplify the code by removing KafkaClientSupplier from the deeply nested StreamsProducer, to simplify the code.
Reviewers: Bill Bejeck <bill@confluent.io>
Introduced a share fetch purgatory on the broker which delays share fetch requests that cannot be completed instantaneously. Introduced 2 new classes -
DelayedShareFetch - Contains logic to instantaneously complete or force complete a share fetch request on timeout.
DelayedShareFetchKey - Contains the key which can be used to watch the entries within the share fetch purgatory.
ShareFetchUtils - This utility class contains functionalities required for post-processing once the replica manager fetch is completed.
There are many scenarios which can cause a share fetch request to be delayed and multiple scenarios when a delayed share fetch can be attempted to be completed. In this PR, we are only targeting the case when record lock partition limit is reached, the ShareFetch should wait for up to MaxWaitMs for records to be released.
Reviewers: David Arthur <mumrah@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Jun Rao <junrao@gmail.com>
Introduces the share coordinator. This coordinator is built on the new coordinator runtime framework. It
is responsible for persistence of share-group state in a new internal topic named "__share_group_state".
The responsibility for being a share coordinator is distributed across the brokers in a cluster.
Reviewers: David Arthur <mumrah@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
The COPY_SEGMENT_STARTED state segments are counted when calculating remote retention size. This causes unexpected retention size breached segment deletion. This PR fixes it by
1. only counting COPY_SEGMENT_FINISHED and DELETE_SEGMENT_STARTED state segments when calculating remote log size.
2. During copy Segment, if we encounter errors, we will delete the segment immediately.
3. Tests added.
Co-authored-by: Guillaume Mallet <>
Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org>, Guillaume Mallet <>
The patch adds support of alter/describe configs for group in kafka-configs.sh.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
Introduce ShareCoordinator interface and related classes.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, David Arthur <mumrah@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Add the version check to client side when building ListOffsetRequest for the specific timestamp:
1) the version must be >=8 if timestamp=-4L (EARLIEST_LOCAL_TIMESTAMP)
2) the version must be >=9 if timestamp=-5L (LATEST_TIERED_TIMESTAMP)
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This patch introduces a wrapper around [HdrHistogram](https://github.com/HdrHistogram/HdrHistogram) to use for group coordinator histograms, event queue time, event processing time, flush time, and purgatory time.
Reviewers: David Jacot <djacot@confluent.io>
Establishes the new `:share` Gradle module. This module is intended to be used for server-side KIP-932 classes that are not part of the new share group coordinator.
This patch relocates and renames some existing classes. A small amount of compatibility changes were also made, but do not affect any logic.
Reviewers: Andrew Schofield <aschofield@confluent.io>, David Arthur <mumrah@gmail.com>
What
Introduced a new module share-coordinator to house relevant implementation code and resources.
Modified settings.gradle and build.gradle to accommodate the module.
Added ShareSnapshot[Key, Value], ShareUpdate[Key, Value] message record schemas.
Introduced a trivial impl of ShareCoordinatorShard class to establish dependencies with other modules (:coordinator-common, :metadata). The actual impl for this class will be done in future PRs.
Why
The share coordinator component has been introduced as part of KIP-932 (QFK). This component is will be responsible for managing persistence for various data related to share partitions into a dedicated internal topic.
To keep all this functionality contained, we want to create a separate module in line with group and transaction coordinators.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
There is a lot of code in group-coordinator which is not share/consumer/classic group specific.
Since we are introducing a share-coordinator as part of KIP-932 (in a new module), it would make sense to get the common coordinator functionality into a separate common coordinator module so that share-coordinator need not depend on group-coordinator.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, David Jacot <djacot@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
In MetadataVersion 3.7-IV2 and above, the broker's AssignmentsManager sends an RPC to the
controller informing it about which directory we have chosen to place each new replica on.
Unfortunately, the code does not check to see if the topic still exists in the MetadataImage before
sending the RPC. It will also retry infinitely. Therefore, after a topic is created and deleted in
rapid succession, we can get stuck including the now-defunct replica in our subsequent
AssignReplicasToDirsRequests forever.
In order to prevent this problem, the AssignmentsManager should check if a topic still exists (and
is still present on the broker in question) before sending the RPC. In order to prevent log spam,
we should not log any error messages until several minutes have gone past without success.
Finally, rather than creating a new EventQueue event for each assignment request, we should simply
modify a shared data structure and schedule a deferred event to send the accumulated RPCs. This
will improve efficiency.
Reviewers: Igor Soarez <i@soarez.me>, Ron Dagostino <rndgstn@gmail.com>
This PR adds support for add-controller and remove-controller in the kafka-metadata-quorum.sh
command-line tool. It also fixes some minor server-side bugs that blocked the tool from working.
In kafka-metadata-quorum.sh, the implementation of remove-controller is fairly straightforward. It
just takes some command-line flags and uses them to invoke AdminClient. The add-controller
implementation is a bit more complex because we have to look at the new controller's configuration
file. The parsing logic for the advertised.listeners and listeners server configurations that we
need was previously implemented in the :core module. However, the gradle module where
kafka-metadata-quorum.sh lives, :tools, cannot depend on :core. Therefore, I moved listener parsing
into SocketServerConfigs.listenerListToEndPoints. This will be a small step forward in our efforts
to move Kafka configuration out of :core.
I also made some minor changes in kafka-metadata-quorum.sh and Kafka-storage-tool.sh to handle
--help without displaying a backtrace on the screen, and give slightly better error messages on
stderr. Also, in DynamicVoter.toString, we now enclose the host in brackets if it contains a colon
(as IPV6 addresses can).
This PR fixes our handling of clusterId in addRaftVoter and removeRaftVoter, in two ways. Firstly,
it marks clusterId as nullable in the AddRaftVoterRequest.json and RemoveRaftVoterRequest.json
schemas, as it was always intended to be. Secondly, it allows AdminClient to optionally send
clusterId, by using AddRaftVoterOptions and RemoveRaftVoterOptions. We now also remember to
properly set timeoutMs in AddRaftVoterRequest. This PR adds unit tests for
KafkaAdminClient#addRaftVoter and KafkaAdminClient#removeRaftVoter, to make sure they are sending
the right things.
Finally, I fixed some minor server-side bugs that were blocking the handling of these RPCs.
Firstly, ApiKeys.ADD_RAFT_VOTER and ApiKeys.REMOVE_RAFT_VOTER are now marked as forwardable so that
forwarding from the broker to the active controller works correctly. Secondly,
org.apache.kafka.raft.KafkaNetworkChannel has now been updated to enable API_VERSIONS_REQUEST and
API_VERSIONS_RESPONSE.
Co-authored-by: Murali Basani muralidhar.basani@aiven.io
Reviewers: José Armando García Sancio <jsancio@apache.org>, Alyssa Huang <ahuang@confluent.io>
The purpose of this PR is to remove ConsumerTestBuilder.java since it is no longer needed. The following PRs have eliminated the use of ConsumerTestBuilder:
#14930#16140#16200#16312
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This pr support EarliestLocalSpec LatestTierSpec in GetOffsetShell, and add integration tests.
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang <payang@apache.org>
As part of KIP-853, storage-tool.sh now has two new flags: --standalone, and --initial-voters. This PR implements these two flags in storage-tool.sh.
There are currently two valid ways to format a cluster:
The pre-KIP-853 way, where you use a statically configured controller quorum. In this case, neither --standalone nor --initial-voters may be specified, and kraft.version must be set to 0.
The KIP-853 way, where one of --standalone and --initial-voters must be specified with the initial value of the dynamic controller quorum. In this case, kraft.version must be set to 1.
This PR moves the formatting logic out of StorageTool.scala and into Formatter.java. The tool file was never intended to get so huge, or to implement complex logic like generating metadata records. Those things should be done by code in the metadata or raft gradle modules. This is also useful for junit tests, which often need to do formatting. (The 'info' and 'random-uuid' commands remain in StorageTool.scala, for now.)
Reviewers: José Armando García Sancio <jsancio@apache.org>
This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR actually catches processing exceptions from punctuate.
Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: loicgreffier <loic.greffier@michelin.com>
Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
As part of KIP-950, we want to split the RemoteLogManagerScheduledThreadPool into separate thread pools (one for copy and another for expiration). In this change, we are splitting it into three thread pools (one for copy, one for expiration, and another one for follower). We are reusing the same thread pool configuration for all three thread pools. We can introduce new user-facing configurations later.
Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>, Satish Duggana <satishd@apache.org>
ShareGroupHeartbeat API support as defined in KIP-932. The heartbeat persists Group and Member information on __consumer_offsets topic.
The PR also moves some of the ShareGroupConfigs to GroupCoordinatorConfigs as they should only be used in group coordinator.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
Defined share group, member and sinmple assignor classes with API definition for Share Group Heartbeat and Describe API.
The ShareGroup and ShareGroupMember extends the common ModernGroup and ModernGroupMember respectively.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
Implemented the functionality which takes care of archiving the records when LSO moves past them. Implemented the following functions -
1. updateCacheAndOffsets - Updates the cached state, start and end offsets of the share partition as per the new log start offset. The method is called when the log start offset is moved for the share partition.
2. archiveAvailableRecordsOnLsoMovement - This function archives all the available records when they are behind the LSO.
3. archivePerOffsetBatchRecords - It archives all the available records in the per offset tracked batch passed to this function.
4. archiveCompleteBatch - It archives all the available records of the complete batch passed to this function.
Reviewers: Andrew Schofield <aschofield@confluent.io>,Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
About
This PR adds acknowledge code in SharePartitionManager. Internally, the record acknowledgements happen at the SharePartition level. SharePartitionManager identifies the SharePartitions and calls their acknowledge method to actually acknowledge the individual records
Testing
Added unit tests to cover the new functionality added in SharePartitionManagerTest
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
The implementation for share-fetch next-fetch-offset in share partition and acquiring records from log.
The Next Fetch Offset (NFO) determines where the Share Partition should initiate the next data read from the Replica Manager. While it typically aligns with the last offset of the most recently returned batch, last offset + 1, there are exceptions. Messages marked available again due to release acknowledgements or lock timeouts can cause the NFO to shift.
The acquire method caches the batches as acquired in-memory and spawns a timer task for lock timeout.
Cache
Per-offset Metadata: Simple to implement but inefficient. Every offset requires in-memory storage and traversal, leading to high memory usage and processing overhead, especially for per-batch acknowledgements (mostly the way records would be acknowledged).
Per-Replica Fetch Batch: This approach aligns with the Replica Manager fetch batches. Since a full Replica Manager batch is retrieved whenever the requested offset falls within that batch's boundaries, a single Share Fetch request will likely receive an entire Replica Manager batch. However, there's a trade-off. Replica Manager batches are based on producer batching. If producers don't batch effectively, the in-flight metadata becomes heavily reliant on the producer's batching behavior.
For per-message acknowledgements, per-offset tracking will be necessary which again requires splitting in-flight batches based on state. Splitting bacthes is inefficient as it requires cache update wshich maintains sorted order. Therefore, we propose a hybrid approach:
Implemented a combination of option 2 (per-in-flight batch tracking) with option 1 (per-offset tracking). This aligns well with Replica Manager batching.
States shall be maintained per in-flight batch. If state inconsistencies arise within in-flight batches due to per-message acknowledgements, switch state tracking for the respective batch to option 1 (per-offset tracking).
Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>, Abhinav Dixit <144765188+adixitconfluent@users.noreply.github.com>
- Added the integration of the quota manager to throttle copy requests to the remote storage. Reference KIP-956
- Added unit-tests for the copy throttling logic.
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
Added more testing for the StickyTaskAssignor, which includes a large-scale test with rack aware enabled.
Also added a no-op change to StreamsAssignmentScaleTest.java to allow for rack aware optimization testing.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Fixed the calculation of the store name list based on the subtopology being accessed.
Also added a new test to make sure this new functionality works as intended.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Allow KRaft replicas to send requests to any node (Node) not just the nodes configured in the
controller.quorum.voters property. This flexibility is needed so KRaft can implement the
controller.quorum.voters configuration, send request to the dynamically changing set of voters and
send request to the leader endpoint (Node) discovered through the KRaft RPCs (specially
BeginQuorumEpoch request and Fetch response).
This was achieved by changing the RequestManager API to accept Node instead of just the replica ID.
Internally, the request manager tracks connection state using the Node.idString method to match the
connection management used by NetworkClient.
The API for RequestManager is also changed so that the ConnectState class is not exposed in the
API. This allows the request manager to reclaim heap memory for any connection that is ready.
The NetworkChannel was updated to receive the endpoint information (Node) through the outbound raft
request (RaftRequent.Outbound). This makes the network channel more flexible as it doesn't need to
be configured with the list of all possible endpoints. RaftRequest.Outbound and
RaftResponse.Inbound were updated to include the remote node instead of just the remote id.
The follower state tracked by KRaft replicas was updated to include both the leader id and the
leader's endpoint (Node). In this comment the node value is computed from the set of voters. In
future commit this will be updated so that it is sent through KRaft RPCs. For example
BeginQuorumEpoch request and Fetch response.
Support for configuring controller.quorum.bootstrap.servers was added. This includes changes to
KafkaConfig, QuorumConfig, etc. All of the tests using QuorumTestHarness were changed to use the
controller.quorum.bootstrap.servers instead of the controller.quorum.voters for the broker
configuration. Finally, the node id for the bootstrap server will be decreasing negative numbers
starting with -2.
Reviewers: Jason Gustafson <jason@confluent.io>, Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
Added the implementation of the quota manager that will be used to throttle copy and fetch requests from the remote storage. Reference KIP-956
Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash <kchandraprakash@uber.com>, Jun Rao <junrao@gmail.com>
This patch implements the LeaveGroup API to the consumer groups that are in the mixed mode.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
Implements KIP-1036.
Add raw ConsumerRecord data to RecordDeserialisationException to make DLQ implementation easier.
Reviewers: Kirk True <ktrue@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Matthias J. Sax <matthias@confluent.io>
1. Replace TopicBasedRemoteLogMetadataManagerWrapperWithHarness with RemoteLogMetadataManagerTestUtils#builder in RemoteLogMetadataManagerTest.
2. Use ClusterTestExtention for RemoteLogMetadataManagerTest.
Signed-off-by: PoAn Yang <payang@apache.org>
Reviewers: Luke Chen <showuon@gmail.com>