Apache Kafka 4.0 will only support KRaft and 3.0-IV1 is the minimum version supported by KRaft. So, we can assume that Apache Kafka 4.0 will only communicate with brokers that are 3.0-IV1 or newer.
Note that KRaft was only marked as production-ready in 3.3, so we could go further and set the baseline to 3.3. I think we should have that discussion, but it made sense to start with the non controversial parts.
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <david.jacot@gmail.com>
A simplified port of "CopartitionedTopicsEnforcer" from the client-side to the group coordinator.
This class is responsible for enforcing the number of partitions in copartitioned topics. For each copartition group, it checks whether the number of partitions for all topics in the group is the same, and enforces the right number of partitions for repartition topics whose number of partitions is not enforced by the topology.
Compared to the client-side version, the implementation uses immutable data structures, and returns the computed number of partitions instead of modifying mutable data structures and calling the admin client.
Reviewers: Bruno Cadonna <cadonna@apache.org>
This patch updates the GroupCoordinatorSerde and the ShareGroupCoordinatorSerde to leverage the CoordinatorRecordType to deserialize records. With this, newly added record are automatically picked up. In other words, the serdes work with all defined records without doing anything.
Reviewers: Andrew Schofield <aschofield@confluent.io>
A simplified port of "RepartitionTopics" from the client-side to the group coordinator.
Compared to the client-side version, the implementation uses immutable data structures, and returns the computed number of partitions instead of modifying mutable data structures and calling the admin client.
Reviewers: Bruno Cadonna <cadonna@apache.org>
Adds a class that represent the topology of a Streams group sent by a Streams client in the Streams group heartbeat during initialization to the group coordinator.
This topology representation is used together with the partition metadata on the broker to create a configured topology.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-18321: Add StreamsGroupMember, MemberState and Assignment classes
This commit adds the classes to represent a Streams group member in the
consumer coordinator.
Reviewers: Bill Bejeck <bill@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
A simplified port of "ChangelogTopics" from the client-side to the group coordinator
Compared to the client-side version, the implementation uses immutable data structures, and returns the computed number of partitions instead of modifying mutable data structures and calling the admin client.
Reviewers: Bruno Cadonna <cadonna@apache.org>
Clients in the Streams Rebalance Protocol send an "unconfigured" representation of the topology to the broker. That is, the number of input topics and (some) internal topics is not fixed, regular expressions are not resolved. The broker takes this description of the topology and, together with the current state of the topics on the broker, derives a ConfiguredTopology. The configured topology is what is being returned from StreamsGroupDescribe, and has all number of partitions defined, and regular expressions resolved. The configured topology also contains missing internal topics that need to be created, and potentially configuration errors, such as missing source topics.
In this change, we add the internal data structures for representing the configured topology. They differ in some details from the data structures used in the RPCs. Most importantly, they can be evolved independently of the public interface.
Reviewers: Bruno Cadonna <cadonna@apache.org>
Introduces interfaces for defining task assignors. Task assignors are pure functions, mapping the state of the group and a topology to a target assignment. We include a mock assignor, which we will be able to use when testing / benchmarking without the complexities of the sticky task assignor and the high-availability task assignor. We may remove the mock assignor in before the streams rebalance protocol goes GA.
The consumer groups introduce these interfaces to establish a clear separation between the group coordinator code and the pluggable assignors, which may live outside the group coordinator code. We have removed pluggable assignors in KIP-1071, but I think it still makes sense to keep these interfaces for having a clean interface for people to code against. This will pay off, if we plan on making the task assignors pluggable later.
Reviewers: Bill Bejeck <bbejeck@gmail.com>, David Jacot <djacot@confluent.io>
Included in this change:
1. Remove deprecated protocol api versions from json files.
3. Remove fields that are no longer used from json files (affects ListOffsets, OffsetCommit, DescribeConfigs).
4. Remove record down-conversion support from KafkaApis.
5. No longer return `Errors.UNSUPPORTED_COMPRESSION_TYPE` on the fetch path[1].
6. Deprecate `TopicConfig. MESSAGE_DOWNCONVERSION_ENABLE_CONFIG` and made the relevant
configs (`message.downconversion.enable` and `log.message.downcoversion.enable`) no-ops since
down-conversion is no longer supported. It was an oversight not to deprecate this via KIP-724.
7. Fix `shouldRetainsBufferReference` to handle null request schemas for a given version.
8. Simplify producer logic since it only supports the v2 record format now.
9. Fix tests so they don't exercise protocol api versions that have been removed.
10. Add upgrade note.
Testing:
1. System tests have a lot of failures, but those tests fail for trunk too and I didn't see any issues specific to this change - it's hard to be sure given the number of failing tests, but let's not block on that given the other testing that has been done (see below).
3. Java producers and consumers with version 0.9-0.10.1 don't have api versions support and hence they fail in an ungraceful manner: the broker disconnects and the clients reconnect until the relevant timeout is triggered.
4. Same thing seems to happen for the console producer 0.10.2 although it's unclear why since api versions should be supported. I will look into this separately, it's unlikely to be related to this PR.
5. Console consumer 0.10.2 fails with the expected error and a reasonable message[2].
6. Console producer and consumer 0.11.0 works fine, newer versions should naturally also work fine.
7. kcat 1.5.0 (based on librdkafka 1.1.0) produce and consume fail with a reasonable message[3][4].
8. kcat 1.6.0-1.7.0 (based on librdkafka 1.5.0 and 1.7.0 respectively) consume fails with a reasonable message[5].
9. kcat 1.6.0-1.7.0 produce works fine.
10. kcat 1.7.1 (based on librdkafka 1.8.2) works fine for consumer and produce.
11. confluent-go-client (librdkafka based) 1.8.2 works fine for consumer and produce.
12. I will test more clients, but I don't think we need to block the PR on that.
Note that this also completes part of KIP-724: produce v2 and lower as well as fetch v3 and lower are no longer supported.
Future PRs will remove conditional code that is no longer needed (some of that has been done in KafkaApis,
but only what was required due to the schema changes). We can probably do that in master only as it does
not change behavior.
Note that I did not touch `ignorable` fields even though some of them could have been
changed. The reasoning is that this could result in incompatible changes for clients
that use new protocol versions without setting such fields _if_ we don't manually
validate their presence. I will file a JIRA ticket to look into this carefully for each
case (i.e. if we do validate their presence for the appropriate versions, we can
set them to ignorable=false in the json file).
[1] We would return this error if a fetch < v10 was used and the compression topic config was set
to zstd, but we would not do the same for the case where zstd was compressed at the producer
level (the most common case). Since there is no efficient way to do the check for the common
case, I made it consistent for both by having no checks.
[2] ```org.apache.kafka.common.errors.UnsupportedVersionException: The broker is too new to support JOIN_GROUP version 1```
[3]```METADATA|rdkafka#producer-1| [thrd:main]: localhost:9092/bootstrap: Metadata request failed: connected: Local: Required feature not supported by broker (0ms): Permanent```
[4]```METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Metadata request failed: connected: Local: Required feature not supported by broker (0ms): Permanent```
[5] `ERROR: Topic test-topic [0] error: Failed to query logical offset END: Local: Required feature not supported by broker`
Reviewers: David Arthur <mumrah@gmail.com>
This patch is the first one in a series to improve how coordinator records are managed. It focuses on making coordinator records first class citizen in the generator.
* Introduce `coordinator-key` and `coordinator-value` in the schema;
* Introduce `apiKey` for those. This is done to avoid relying on the version to determine the type.
* It also allows the generator to enforce some rules: the key cannot use flexible versions, the key must have a single version `0`, there must be a key and a value for a given api key, etc.
* It generates an enum with all the coordinator record types. This is pretty handy in the code.
The patch also updates the group coordinators to use those.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Andrew Schofield <aschofield@confluent.io>
KIP-1071 specifies records that the group coordinator needs to store
into the consumer offset topic to persist the state of a Streams
group. This records are specified in json files from which the actual
classes for the records are generated.
This commit adds the records needed by the group coordinator to store
the state of a Streams group.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
KIP-848 will be release as GA in Apache Kafka 4.0. Hence we need to mark all the related public apis as stable.
Reviewers: Jeff Kim <jeff.kim@confluent.io>
Kafka consumer supports auto.offset.reset config option, which is used when there is no initial offset in Kafka (or) if the current offset does not exist any more on the server. This config currently supports earliest/latest/none options. Currently consumer resets might force applications to reprocess large amounts of data from earlier offsets. With infinite storage, its beneficial to have a duration based offset reset strategy. This will allow applications to consume/initialise from a fixed duration when there is no initial offset in Kafka.
As part of KIP-932, we are adding support for share consumer groups. Share consumer groups supports dynamic group configuration property share.auto.offset.reset. This is used to set the initial Share-Partition Start Offset (SPSO) based on the share.auto.offset.reset configuration. Currently share.auto.offset.reset supports earliest and latest options to automatically reset the offset
Similar to the Kafka Consumer, we will add support for by_duration: config value for share.auto.offset.reset.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
We have observed the below errors in some cluster:
Uncaught exception in scheduled task 'handleTxnCompletion-902667' exception.message:Trying to complete a transactional offset commit for producerId *** and groupId *** even though the offset commit record itself hasn't been appended to the log.
When a transaction is completed, the transaction coordinator sends a WriteTxnMarkers request to all the partitions involved in the transaction to write the markers to them. When the broker receives it, it writes the markers and if markers are written to the __consumer_offsets partitions, it informs the group coordinator that it can materialize the pending transactional offsets in its main cache. The group coordinator does this asynchronously since Apache Kafka 2.0, see this patch.
The above error appends when the asynchronous operation is executed by the scheduler and the operation finds that there are pending transactional offsets that were not written yet. How come?
There is actually an issue is the steps described above. The group coordinator does not wait until the asynchronous operation completes to return to the api layer. Hence the WriteTxnMarkers response may be send back to the transaction coordinator before the async operation is actually completed. Hence it is possible that the next transactional produce to be started also before the operation is completed too. This could explain why the group coordinator has pending transactional offsets that are not written yet.
There is a similar issue when the transaction is aborted. However on this path, we don't have any checks to verify whether all the pending transactional offsets have been written or not so we don't see any errors in our logs. Due to the same race condition, it is possible to actually remove the wrong pending transactional offsets.
PS: The new group coordinator is not impacted by this bug.
Reviewers: Justine Olshan <jolshan@confluent.io>
This pull request replaces Log4j with Log4j2 across the entire project, including dependencies, configurations, and code. The notable changes are listed below:
1. Introduce Log4j2 Instead of Log4j
2. Change Configuration File Format from Properties to YAML
3. Adds warnings to notify users if they are still using Log4j properties, encouraging them to transition to Log4j2 configurations
Co-authored-by: Lee Dongjin <dongjin@apache.org>
Reviewers: Luke Chen <showuon@gmail.com>, Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The LeaveGroup API is used by the admin client to remove static members or remove all members from the group. The latter does not work because the API does not allow removing a member using the CONSUMER protocol by member id. Moreover, the response should only include the member id if the member id was included in the request. This patch fixes both issues.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Christo Lolov <lolovc@amazon.com>, Jeff Kim <jeff.kim@confluent.io>
Disallow upgrades from classic groups to consumer groups when any member's assignment has non-empty userData.
Reviewers: David Jacot <djacot@confluent.io>
Improve descriptive information in Kafka protocol documentation.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
Removes the client side AddPartitionsToTxn/AddOffsetsToTxn calls so that the partition is implicitly added as part of KIP-890 part 2.
This change also requires updating the valid state transitions. The client side can not know for certain if a partition has been added server side when the request times out (partial completion). Thus for TV2, the transition to PrepareAbort is now valid for Empty, CompleteCommit, and CompleteAbort.
For readability, the V1 and V2 endTransaction methods have been separated.
Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan <jolshan@confluent.io>, Ritika Reddy <rreddy@confluent.io>
This is the last patch in the series which introduces regular expressions in the new consumer group protocol. The patch ensures that the subscription type of the group takes into account the regular expressions. Please refer to the code to see how they are included.
Reviewers: Sean Quah <squah@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
This PR introduces the DescribeGroups v6 API as part of KIP-1043. This adds an error message for the described groups so that it is possible to get some context on the error. It also changes the behaviour for when the group ID cannot be found but returning error code GROUP_ID_NOT_FOUND rather than NONE.
Reviewers: David Jacot <djacot@confluent.io>
With the addition of the SubscribedTopicRegex field to the ConsumerGroupHeartbeat request, we need to update the definition of a full request. This patch does so.
Reviewers: Lianet Magrans <lmagrans@confluent.io>
This patch does a few things:
1) It cleans up resolved regular expressions when they are unsubscribed from. It covers the regular leave/fenced paths for the new protocol and it also covers the LeaveGroup API as new members could be removed via the admin API.
2) It ensures that tombstones for resolved regular expressions are generated on the conversion patch from consumer to classic group.
3) It fixes [KAFKA-18116](https://issues.apache.org/jira/browse/KAFKA-18116) because I faced the same issue while working on the LeaveGroup API. It adds an integration test for this case too.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
1) Bump validVersions of ConsumerGroupDescribeRequest.json and ConsumerGroupDescribeResponse.json to "0-1".
2) Add MemberType field to ConsumerGroupDescribeResponse.json. Default value is -1 (unknown). 0 is for classic member and 1 is for consumer member.
3) When ConsumerGroupMember#useClassicProtocol is true, return MemberType field as 0. Otherwise, return 1.
Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
There are two issues in KAFKA-18060:
1) New coordinator can't handle the TxnOffsetCommitRequest with empty member id, and TxnOffsetCommitRequest v0-v2 do definitely has an empty member ID, causing ConsumerGroup#validateOffsetCommit to throw an UnknownMemberIdException. This prevents the old producer from calling sendOffsetsToTransaction. Note that TxnOffsetCommitRequest versions v0-v2 are included in KIP-896, so it seems the new coordinator should support that operations
2) The deprecated API Producer#sendOffsetsToTransaction does not use v0-v2 to send TxnOffsetCommitRequest with an empty member ID. Unfortunately, it has been released for a while. Therefore, the new coordinator needs to handle TxnOffsetCommitRequest with an empty member ID for all versions.
Taken from the two issues above, we need to handle empty member id in all API versions when new coordinator are dealing with TxnOffsetCommitRequest.
Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This patch relaxes requiring non-empty subscribed names and regex in the full heartbeat request. Without this, a consumer using client side regexes may not be able to join the group when the regex does not match any topics yet and this is inconsistent with the old protocol. Relaxing the subscribed regex is not strictly required but it seems better to keep it consistent.
Reviewers: Lianet Magrans <lmagrans@confluent.io>
This patch introduces the asynchronous resolution of regular expressions. Let me unpack a few details about the implementations:
1) I have decided to finally update all the regular expressions within a consumer group together. My assumption is that the number of regular expressions in a group will be generally small but the number of topics in a cluster is large. Hence grouping has two benefits. Firstly, it allows to go through the list of topics once for all the regular expressions. Secondly, it reduces the number of potential rebalances because all the regular expressions are updated at the same time.
2) An update is triggered when the group is subscribed to at least one regular expressions.
3) An update is triggered when there is no ongoing update.
4) An update is triggered only of the previous one is older than 10s.
5) An update is triggered when the group has unresolved regular expressions.
6) An update is triggered when the metadata image has new topics.
Reviewers: Jeff Kim <jeff.kim@confluent.io>
During conversion from classic to consumer group, if a member has empty assignment (e.g., the member just joined and has never synced), the conversion will fail because of the buffer underflow error when deserializing the member assignment. This patch allows empty assignment while deserializing the member assignment.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
This PR introduces the unified GroupState enum for all group types from KIP-1043. This PR also removes ShareGroupState and begins the work to replace Admin.listShareGroups with Admin.listGroups. That will complete in a future PR.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This patch introduces the `CoordinatorExecutor` construct into the `CoordinatorRuntime`. It allows scheduling asynchronous tasks from within a `CoordinatorShard` while respecting the runtime semantic. It will be used to asynchronously resolve regular expressions.
The `GroupCoordinatorService` uses a default `ExecutorService` with a single thread to back it at the moment. It seems that it should be sufficient. In the future, we could consider making the number of threads configurable.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
This patch does a few things:
* Refactors the `TargetAssignmentBuilder` to use inheritance to differentiate Consumer and Share groups.
* Introduces `UnionSet` to lazily aggregate the subscriptions for a given member.
* Wires the resolved regular expressions in the `GroupMetadataManager`. At the moment, they are only used when the target assignment is computed.
Reviewers: Sean Quah <squah@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
This patch cleans up the `Assertions` class in the `group-coordinator` module.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This patch does a few cleanups in GroupMetadataManagerTest:
* Uses `Map.of` where possible.
* Uses `List.of` instead of `Arrays.asList`.
* Fix inconsistent indentation in a few places.
Reviewers: Lianet Magrans <lmagrans@confluent.io>
This PR adds another dynamic config share.auto.offset.reset fir share groups.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Abhinav Dixit <adixit@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
This patch introduces the ConsumerGroupRegularExpression record (key + value) and updates the `GroupMatadataManager` and the `ConsumerGroup` to bookkeep it appropriately. Note that with this change, regular expressions are counted as subscribers in the `subscribedTopicNames` data structure. This is important because the topic metadata of the group is computed based on it.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
This patch adds a data structure to ConsumerGroup to track the number of members subscribed to each regular expressions in the group. This will be useful to know whether a regex is new in the group or whether a regex must be removed from the group.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
This patch does two things:
1) Change the validation of the ConsumerGroupHeartbeat request to accept subscribed topic names and/or subscribed topic regex. At least of them must be set in the first request with epoch 0.
2) Validate the provided regular expression by compiling it.
Co-authored-by: Lianet Magrans <lmagrans@confluent.io>
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
Adds the DefaultStatePersister and other supporting classes for managing share state.
* Added DefaultStatePersister implementation. This is the entry point for callers who wish to invoke the share state RPC API.
* Added PersisterStateManager which is used by DefaultStatePersister to manage and send the RPCs over the network.
* Added code to BrokerServer and BrokerMetadataPublisher to instantiate the appropriate persister based on the config value for group.share.persister.class.name. If this is not specified, the DefaultStatePersister will be used. To force use of NoOpStatePersister, set the config to empty. This is an internal config, not to be exposed to the end user. This will be used to factory plug the appropriate persister.
* Using this persister, the internal __share_group_state topic will come to life and will be used for persistence of share group info.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>
The PR adds capability to restrict the messages in Share Fetch. The max fetch records will be an additional way to limit the number of records sent from broker to client.
In Share Fetch, with min and mx bytes, there exists 3 problems:
1. The max.poll.records client config sends the max number of records defined to application but might have fetched extra becuase of higher max bytes. But the timeout for the sent records has started on the broker.
2. As the application processes records as per max.poll.records, hence those number of records are sent in every acknowledgement. This causes the cache data to be tracked per offset as the batch is broken.
3. The client has to sent the partial acknoledgment batch and cannot piggyback on fetch requests.
To handle the above scenario max fetch records has been added. Once this PR is merged and we define the right methodolgy then KIP will be updated to have max fetch records in share fetch RPC rather as broker config.
Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
When a dynamic member joins a group, it sends two JOIN_GROUP requests.
The first request doesn't have a member id and returns
MEMBER_ID_REQUIRED with a broker-generated member id. The second request
includes the broker-generated member id and joins the group for real.
We emit a log message at info level for the first join request, but not
the second. Log the second join request at info level too.
Reviewers: David Jacot <djacot@confluent.io>
Rework the uniform heterogeneous assignor to improve performance, while
preserving the high level ideas and structure from the existing
implementation:
* The assignor works in 3 stages: importing the previous assignment for
stickiness, assigning unassigned partitions and iteratively
reassigning partitions to improve balance.
* Unassigned partitions are assigned to the subscribers with the least
number of partitions. This maximizes balance within a single topic.
* During the iterative rebalancing phase, partitions are reassigned to
their previous owner if it improves balance (stickiness restoration).
* During the iterative rebalancing phase, partitions are reassigned to
the subscriber with the least number of partitions to improve
balance.
A non-exhaustive list of changes is:
* The assignment of unassigned partitions and iterative reassignment
stages now works through partitions topic by topic. Previously
partitions from topics with the same number of partitions per
subscriber would be interleaved. Since we iterate topic by topic, we
can reuse data about topic subscribers.
* Instead of maintaining TreeSets to find the least loaded subscribers,
we sort an ArrayList of subscribers once per topic and start filling
up subscribers, least loaded first. In testing, this approach was
found to be faster than maintaining PriorityQueues.
* Implement stickiness restoration by creating a mapping of partitions
to previous owner and checking against that mapping, instead of
tracking partition movements during iterative reassignment.
* Track member partition counts using a plain int array, to avoid
overhead from boxing and HashMap lookups. Member partition counts are
accessed very frequently and this needs to be fast. As a consequence,
we have to number members 0 to M - 1.
* Bound the iterative reassignment stage to a fixed number of
iterations. Under some uncommon subscription patterns, the iterative
reassignment stage converges slowly. In these cases, the iterative
reassignment stage terminates without producing an optimally balanced
assignment anyway (see javadoc for balanceTopics).
* Re-use Maps from the previous assignment where possible,
ie. introduce a copy-on-write mechanism while computing the new
assignment.
Reviewers: David Jacot <djacot@confluent.io>
This PR adds the 2 configs share.heartbeat.interval.ms and share.session.timeout.ms in GroupConfig. These can be dynamically set for a share group without restarting the server
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
This PR is the first series in the attempt to add share.record.lock.duration.ms to dynamic group configs. As part of this PR, the ShareGroupConfig has been moved to org.apache.kafka.coordinator.group.modern.share
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Abhinav Dixit <adixit@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
This patch fixes a few things:
* Typos.
* Merge the tests for fetchOffsets and fetchAllOffsets together into parameterized tests since they share the same structure.
* Use Topic.GROUP_METADATA_TOPIC_NAME instead of __consumer_offsets in new group coordinator tests.
Reviewers: Ken Huang <s7133700@gmail.com>, David Jacot <djacot@confluent.io>
This implementation doesn't change the existing downgrade path.
In `classicGroupJoinToConsumerGroup`, if the group should be downgraded, it will be converted to a classic group at the end of the method. The returned records will be the records from GroupJoin plus the records from conversion. No rebalance will be triggered in the newly converted group.
Reviewers: David Jacot <djacot@confluent.io>
In the existing implementation, If an operation modifying the classic group state fails, the group reverts but the group size counter does not. This creates an inconsistency between the group size metric and the actual group size.
Considering that It will be complicated to rely on the appendFuture to revert the metrics upon the operation failure, this PR introduces a new implementation. A timeout task will periodically refresh the metrics based on the current groups soft state. The refreshing interval is hardcoded to 60 seconds.
Reviewers: David Jacot <djacot@confluent.io>
While running large scale performance tests, we noticed that the logging on the ConsumerGroupHeartbeat path took a significant amount of CPU. It is mainly due to the very large data structures that we print out. I made a pass on those logs and I switched some of them to debug.
Reviewers: Lianet Magrans <lianetmr@gmail.com>
This patch includes:
- Bump ConsumerGroupHeartbeatRequest version to include subscribedTopicRegex field
- Introduce new error code for InvalidRegularExpression
- Bump ConsumerGroupHeartbeatResponse version to support new regex error
- Wire the new field into the GroupMetadataManager when processing HB
Reviewers: David Jacot <djacot@confluent.io>
The ModernGroup#subscribedTopicMetadata takes too much memory due to partitionRacks. This is not being used at the moment as the consumer protocol does not support rack aware assignments.
A heap dump from a group with 500 members, 2K subscribed topic partitions shows 654,400 bytes used for partitionRacks. The rest of the ConsumerGroup object holds 822,860 bytes.
Reviewers: David Jacot <djacot@confluent.io>
I am still chasing KAFKA-17493. I was able to narrow it down to an issue with the pending join members. This patch logs them in order to help me troubleshooting it further. I will revert this change when the issue is root caused.
Reviewers: David Arthur <mumrah@gmail.com>
This patch fixes a few buts in the replay logic of the consumer group records:
* The first issue is that the logic assumed that the group or the member exists when tombstones are replayed. Obviously, this is incorrect after a restart. The group or the member may not me there anymore if the __consumer_offsets partitions only contains tombstones for the group or the member. The patch fixes this by considering tombstones as no-ops if the entity does not exist.
* The second issue is that the logic assumed that consumer group records are always in a specific order in the log so the logic was only accepting to create a consumer group when `ConsumerGroupMemberMetadata` record is replayed. This is obviously incorrect too. During the life time of a consumer group, the records may be in different order. The patch fixes this by allowing the creating of a consumer group by any record.
* The third issue is that it is possible to replay offset commit records for a specific consumer group before the consumer group is actually created while replying its records. By default the OffsetMetadataManager creates a simple classic group to hold those offset commits. When the consumer offset records are finally replayed, the logic will fail because a classic group already exists. The patch fixes this by converting a simple classic group when records for a consumer group are replayed.
All those combinations are hard to test with unit tests. This patch adds an integration tests which reproduces some of those interleaving of records. I used them to reproduce the issues describe above.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Introduces the share coordinator. This coordinator is built on the new coordinator runtime framework. It
is responsible for persistence of share-group state in a new internal topic named "__share_group_state".
The responsibility for being a share coordinator is distributed across the brokers in a cluster.
Reviewers: David Arthur <mumrah@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
This PR ensures that using the various group RPCs work properly when issued against the wrong type of group, such as DescribeConsumerGroups for a share group, or ConsumerGroupHeartbeat for a share group. There are no changes to the RPC error codes required.
The significant code changes are:
Making sure that the group coordinator does not assume that only classic and consumer groups exist. This was the cause of a ClassCastException when ConsumerGroupHeartbeat was being used against a share group.
Making sure that committing offsets to a share group fails with GroupIdNotFoundException rather than java.lang.UnsupportedOperation. This was the cause of a name collision between a share group and a consumer group when using kafka-consumer-groups.sh --reset-offsets which inadvertently created a consumer group of the same name.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
The patch adds support of alter/describe configs for group in kafka-configs.sh.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
Introduce ShareCoordinator interface and related classes.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, David Arthur <mumrah@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This patch makes the new group coordinator, introduced as part of KIP-848, the default. This means that any KRaft cluster created from trunk defaults to using the new group coordinator. This includes all the integration tests which do not specify it. This patch also changes the default in system tests.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This patch introduces a wrapper around [HdrHistogram](https://github.com/HdrHistogram/HdrHistogram) to use for group coordinator histograms, event queue time, event processing time, flush time, and purgatory time.
Reviewers: David Jacot <djacot@confluent.io>
This patch updates getOrMaybeCreateClassicGroup to only throw GroupIdNotFoundException as we did for other internal methods. The callers are responsible for translating the error to the appropriate one depending on the context. There is only one case.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
There is a lot of code in group-coordinator which is not share/consumer/classic group specific.
Since we are introducing a share-coordinator as part of KIP-932 (in a new module), it would make sense to get the common coordinator functionality into a separate common coordinator module so that share-coordinator need not depend on group-coordinator.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, David Jacot <djacot@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This patch extends the DescribeConfigs API to support group configs.
Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
In KIP-932, the group coordinator does not persist assignments for share groups. While this sounds like a good idea in terms of minimising overhead for data which doesn't strictly need to be recoverable, it significantly adds to the complexity of working with the coordinator framework.
This PR revises the definitions of the share group record schemas following more closely the schemas used for consumer groups, and eliminating the need to maintain soft state alongside the group coordinator's timeline structure.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Add the "share" group coordinator rebalance protocol as the way to enable KIP-932. It is also necessary to turn on the new group coordinator.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
This patch add resources to store and handle consumer group's config.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>, David Jacot <djacot@confluent.io>
Partition epochs are tracked for consumer groups where epoch is the current assigned member epoch. As share groups have partitions shared hence maintaing the partition epochs is not required.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This patch enables the static member using the old protocol to be replaced in ConsumerGroupHeartbeat even if it hasn't sent a request to leave the group.
This helps in cases where existing static member rejoins and triggers a group upgrade, because the classic static consumer doesn't send leave group request before shutting down.
Reviewers: TengYao Chi <kitingiao@gmail.com>, David Jacot <djacot@confluent.io>
When handling an OFFSET_FETCH request requiring stable offsets, the new
group coordinator may encounter a timeout under some circumstances, such
as a zombie coordinator or a lagging __consumer_offsets replica that has
not yet dropped out of the ISR. Existing and older clients do not expect
the REQUEST_TIMED_OUT error code won't retry, so remap it to
NOT_COORDINATOR to trigger a coordinator lookup and retry.
Reviewers: David Jacot <djacot@confluent.io>
* KAFKA-15875: Stops leak Snapshot in public methods
The Snapshot class is package protected but it's returned in
several public methods in SnapshotRegistry.
To prevent this accidental leakage, these methods are made
package protected as well. For getOrCreateSnapshot a new
method called IdempotentCreateSnapshot is created that returns void.
* Make builer package protected, replace <br> with <p>
Reviewers: Greg Harris <greg.harris@aiven.io>
Implemented handleShareFetch request RPC in KafkaApis.scala. This method is called whenever the client sends a Share Fetch request to the broker. Although Share Fetch request support acknowledgements, since the logic for acknowledging records is not completely implemented in SharePartitionManager.java class, this method currently includes placeholder code for acknowledging, which will be replaced by the actual functionality in the upcoming PRs.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Abhinav Dixit <adixit@confluent.io>, Jun Rao <junrao@gmail.com>
ShareGroupHeartbeat API support as defined in KIP-932. The heartbeat persists Group and Member information on __consumer_offsets topic.
The PR also moves some of the ShareGroupConfigs to GroupCoordinatorConfigs as they should only be used in group coordinator.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
The server side range assignor was made to be sticky i.e. partitions from the existing assignment are retained as much as possible. During a rebalance, the expected behavior is to achieve co-partitioning for members that are subscribed to the same set of topics with equal number of partitions.
However, there are cases where this cannot be achieved efficiently with the current algorithm. There is no easy way to implement stickiness and co-partitioning and hence we have resorted to recomputing the target assignment every time.
In case of static membership, instanceIds are leveraged to ensure some form of stickiness.
```
Benchmark (assignmentType) (assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio) (subscriptionType) (topicCount) Mode Cnt Score Error Units
ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 100 10 HOMOGENEOUS 100 avgt 5 0.052 ± 0.001 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 100 10 HOMOGENEOUS 1000 avgt 5 0.454 ± 0.003 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1000 10 HOMOGENEOUS 100 avgt 5 0.476 ± 0.046 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1000 10 HOMOGENEOUS 1000 avgt 5 3.102 ± 0.055 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 10000 10 HOMOGENEOUS 100 avgt 5 5.640 ± 0.223 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 10000 10 HOMOGENEOUS 1000 avgt 5 37.947 ± 1.000 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 100 10 HETEROGENEOUS 100 avgt 5 0.172 ± 0.001 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 100 10 HETEROGENEOUS 1000 avgt 5 1.882 ± 0.006 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1000 10 HETEROGENEOUS 100 avgt 5 1.730 ± 0.036 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1000 10 HETEROGENEOUS 1000 avgt 5 17.654 ± 1.160 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 10000 10 HETEROGENEOUS 100 avgt 5 18.595 ± 0.316 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 10000 10 HETEROGENEOUS 1000 avgt 5 172.398 ± 2.251 ms/op
JMH benchmarks done
Benchmark (memberCount) (partitionsToMemberRatio) (topicCount) Mode Cnt Score Error Units
TargetAssignmentBuilderBenchmark.build 100 10 100 avgt 5 0.071 ± 0.004 ms/op
TargetAssignmentBuilderBenchmark.build 100 10 1000 avgt 5 0.428 ± 0.026 ms/op
TargetAssignmentBuilderBenchmark.build 1000 10 100 avgt 5 0.659 ± 0.028 ms/op
TargetAssignmentBuilderBenchmark.build 1000 10 1000 avgt 5 3.346 ± 0.102 ms/op
TargetAssignmentBuilderBenchmark.build 10000 10 100 avgt 5 8.947 ± 0.386 ms/op
TargetAssignmentBuilderBenchmark.build 10000 10 1000 avgt 5 40.240 ± 3.113 ms/op
JMH benchmarks done
```
Reviewers: David Jacot <djacot@confluent.io>
Defined share group, member and sinmple assignor classes with API definition for Share Group Heartbeat and Describe API.
The ShareGroup and ShareGroupMember extends the common ModernGroup and ModernGroupMember respectively.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
The group coordinator has (internal) write operations that could generate a large number of records (e.g. expiring offsets and groups). At the moment, those operations are limited by the maximum message size. If they hit it, they are basically stuck forever. This patch extends the CoordinatorRuntime to support non-atomic writes and it changes those internal operations to be non-atomic.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
Following the discussion and suggestion by @dajac, https://github.com/apache/kafka/pull/16054#discussion_r1613638293, the PR refactors the common classes to build TargetAssignment in `modern` package. `consumer` package has been moved inside `modern` package with classes exclusive to `consumer group`.
This PR completes the refactoring and base to introduce `share` package inside `modern`. The subsequent PRs will define the implementation specific to Share Groups while re-using the common functionality from `modern` package classes.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
This patch is an attempt to simplifying GroupMetadataManager#consumerGroupHeartbeat and GroupMetadataManager#classicGroupJoinToConsumerGroup by sharing more of the common logic. It slightly change how static members are replaced too. Now, we generate the records to replace the member and then we update the member if needed.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Justine Olshan <jolshan@confluent.io>
Abstracted code for 2 classes `ConsumerGroup` and `ConsumerGroupMember` to `ModernGroup` and `ModernGroupMember` respectively. The new abstract classes are created to share common functionality with `ShareGroup` and `ShareGroupMember` which are being introduced with KIP-932.
The patch is majorly code refactoring from existing classes to abstract classes. Also created a new package called `modern` where `MemberState` class is moved, in upcoming patches, I will move common classes for `Share` and `Consumer` Group in `modern` package itself.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
When a write operation does not have any records, the coordinator runtime checked whether the state machine is caught-up to decide whether the operation should wait until the state machine is committed up to the operation point or the operation should be completed. The current implementation assumes that there will always be a pending write operation waiting in the deferred queue when the state machine is not fully caught-up yet. This is true except when the state machine is just loaded and not caught-up yet.
This patch fixes the issue by always comparing the last written offset and the last committed offset.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
In `GroupMetadataManager#toTopicPartitions`, we generate a list of `ConsumerGroupHeartbeatRequestData.TopicPartitions` from the input deserialized subscription. Currently the input subscription is `ConsumerPartitionAssignor.Subscription`, where the topic partitions are stored as (topic-partition) pairs, whereas in `ConsumerGroupHeartbeatRequestData.TopicPartitions`, we need the topic partitions to be stored as (topic-partition list) pairs.
`ConsumerProtocolSubscription` is an intermediate data structure in the deserialization where the topic partitions are stored as (topic-partition list) pairs. This pr uses `ConsumerProtocolSubscription` instead as the input subscription to make `toTopicPartitions` more efficient.
Reviewers: David Jacot <djacot@confluent.io>
This patch:
- changes the order of the checks in `validateOnlineDowngrade`, so that only when the last member using the consumer protocol leave and the group still has classic member(s), `online downgrade is disabled` is logged if the policy doesn't allow downgrade.
- changes the session timeout in `convertToConsumerGroup` from `consumerGroupSessionTimeoutMs` to `member.classicProtocolSessionTimeout().get()`.
Reviewers: David Jacot <djacot@confluent.io>
This patch is the continuation of https://github.com/apache/kafka/pull/15964. It introduces the records coalescing to the CoordinatorRuntime. It also introduces a new configuration `group.coordinator.append.linger.ms` which allows administrators to chose the linger time or disable it with zero. The new configuration defaults to 10ms.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
Fix the following NPE:
```
java.lang.NullPointerException: Cannot invoke "org.apache.kafka.coordinator.group.assignor.MemberAssignment.targetPartitions()" because the return value of "java.util.Map.get(Object)" is null
at org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.canMemberParticipateInReassignment(GeneralUniformAssignmentBuilder.java:248)
at org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.balance(GeneralUniformAssignmentBuilder.java:336)
at org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.buildAssignment(GeneralUniformAssignmentBuilder.java:157)
at org.apache.kafka.coordinator.group.assignor.UniformAssignor.assign(UniformAssignor.java:84)
at org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.build(TargetAssignmentBuilder.java:302)
at org.apache.kafka.coordinator.group.GroupMetadataManager.updateTargetAssignment(GroupMetadataManager.java:1913)
at org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupHeartbeat(GroupMetadataManager.java:1518)
at org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupHeartbeat(GroupMetadataManager.java:2254)
at org.apache.kafka.coordinator.group.GroupCoordinatorShard.consumerGroupHeartbeat(GroupCoordinatorShard.java:308)
at org.apache.kafka.coordinator.group.GroupCoordinatorService.lambda$consumerGroupHeartbeat$0(GroupCoordinatorService.java:298)
at org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime$CoordinatorWriteEvent.lambda$run$0(CoordinatorRuntime.java:769)
at org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.withActiveContextOrThrow(CoordinatorRuntime.java:1582)
at org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.access$1400(CoordinatorRuntime.java:96)
at org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime$CoordinatorWriteEvent.run(CoordinatorRuntime.java:767)
at org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor$EventProcessorThread.handleEvents(MultiThreadedEventProcessor.java:144)
at org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor$EventProcessorThread.run(MultiThreadedEventProcessor.java:176)
```
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Justine Olshan <jolshan@confluent.io>
This patch renames the uniform assignor's builders to match the `SubscriptionType` which is used to determine which one is called. It removes the abstract class `AbstractUniformAssignmentBuilder` which is not necessary anymore. It also applies minor refactoring.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This patch moves the `PartitionAssignor` interface and all the related classes to a newly created `group-coordinator/api` module, following the pattern used by the storage and tools modules.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
During online migration, there could be ConsumerGroup that has members that uses the classic protocol. In the current implementation, `STALE_MEMBER_EPOCH` could be thrown in ConsumerGroup offset fetch/commit validation but it's not supported by the classic protocol. Thus this patch changed `ConsumerGroup#validateOffsetCommit` and `ConsumerGroup#validateOffsetFetch` to ensure compatibility.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
This patch reworks the `PartitionAssignor` interface to use interfaces instead of POJOs. It mainly introduces the `MemberSubscriptionSpec` interface that represents a member subscription and changes the `GroupSpec` interfaces to expose the subscriptions and the assignments via different methods.
The patch does not change the performance.
before:
```
Benchmark (memberCount) (partitionsToMemberRatio) (topicCount) Mode Cnt Score Error Units
TargetAssignmentBuilderBenchmark.build 10000 10 100 avgt 5 3.462 ± 0.687 ms/op
TargetAssignmentBuilderBenchmark.build 10000 10 1000 avgt 5 3.626 ± 0.412 ms/op
JMH benchmarks done
```
after:
```
Benchmark (memberCount) (partitionsToMemberRatio) (topicCount) Mode Cnt Score Error Units
TargetAssignmentBuilderBenchmark.build 10000 10 100 avgt 5 3.677 ± 0.683 ms/op
TargetAssignmentBuilderBenchmark.build 10000 10 1000 avgt 5 3.991 ± 0.065 ms/op
JMH benchmarks done
```
Reviewers: David Jacot <djacot@confluent.io>
The time taken to compute a new assignment is critical. This patches extending the existing logging to log it too. This is very useful information to have.
Reviewers: Luke Chen <showuon@gmail.com>
Fix the bug where the group downgrade to a classic one when a member leaves, even though the consumer group size is still larger than `classicGroupMaxSize`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
This patch is a small refactoring which mainly aims at avoid to construct a copy of the new target assignment in the TargetAssignmentBuilder because the copy is not used by the caller. The change relies on the exiting tests and it does not really have an impact on performance (e.g. validated with TargetAssignmentBuilderBenchmark).
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
KIP-932 introduces share groups to go alongside consumer groups. Both kinds of group use server-side assignors but it is unlikely that a single assignor class would be suitable for both. As a result, the KIP introduces specific interfaces for consumer group and share group partition assignors.
This PR introduces only the consumer group interface, `o.a.k.coordinator.group.assignor.ConsumerGroupPartitionAssignor`. The share group interface will come in a later release. The existing implementations of the general `PartitionAssignor` interface have been changed to implement `ConsumerGroupPartitionAssignor` instead and all other code changes are just propagating the change throughout the codebase.
Note that the code in the group coordinator that actually calculates assignments uses the general `PartitionAssignor` interface so that it can be used with both kinds of group, even though the assignors themselves are specific.
Reviewers: Apoorv Mittal <amittal@confluent.io>, David Jacot <djacot@confluent.io>
This patch implements the LeaveGroup API to the consumer groups that are in the mixed mode.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
This patch speeds up the computation of the unassigned partitions by exposing the inverted target assignment. It allows the assignor to check whether a partition is assigned or not.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit. Otherwise, we default the write limit to the min buffer size of 16384 for the write limit. This causes the coordinator to threw RecordTooLargeException even when it's under the 1MB max batch size limit.
Reviewers: David Jacot <djacot@confluent.io>
This patch aims to remove the data structure that stores the conversion from topic names to topic ids which was taking time similar to the actual assignment computation. Instead, we reuse the already existing ConsumerGroupMember.subscribedTopicNames() and do the conversion to topic ids when the iterator is requested.
Reviewers: David Jacot <djacot@confluent.io>
This patch implements the heartbeat api to the members that use the classic protocol in a ConsumerGroup.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
DelayEventAccumulator should return immediately if there are no events in the queue. Also removed some unused fields inside EventProcessorThread.
Reviewers: Gaurav Narula <gaurav_narula2@apple.com>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
We have discovered during large scale performance tests that the current write path of the new coordinator does not scale well. The issue is that each write operation writes synchronously from the coordinator threads. Coalescing records into bigger batches helps drastically because it amortizes the cost of writes. Aligning the batches with the snapshots of the timelines data structures also reduces the number of in-flight snapshots.
This patch is the first of a series of patches that will bring records coalescing into the coordinator runtime. As a first step, we had to rework the PartitionWriter interface and move the logic to build MemoryRecords from it to the CoordinatorRuntime. The main changes are in these two classes. The others are related mechanical changes.
Reviewers: Justine Olshan <jolshan@confluent.io>
This patch implements the sync group api for the consumer groups that are in the mixed mode. In classicGroupSyncToConsumerGroup, the assignedPartitions calculated in the JoinGroup will be returned as the assignment in the sync response and the member session timeout will be rescheduled.
Reviewers: David Jacot <djacot@confluent.io>
This patch removes the usage of the Stream API in CoordinatorRecordHelpers. I saw it in a couple of profiles so it is better to remove it.
Reviewers: Justine Olshan <jolshan@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
The heartbeat api to the consumer group with classic protocol members schedules the session timeout. At present, there's no way to get the classic member session timeout in heartbeat to consumer group.
This patch stores the session timeout into the ClassicMemberMetadata in ConsumerGroupMemberMetadataValue and update it when it's provided in the join request.
Reviewers: David Jacot <djacot@confluent.io>
This patch deprecates `offsets.commit.required.acks` in Apache Kafka 3.8 as described in KIP-1041: https://cwiki.apache.org/confluence/x/9YobEg.
Reviewers: Justine Olshan <jolshan@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Write events create and add a TimerTask to schedule the timeout operation. The issue is that we pile up the number of timer tasks which are essentially no-ops if replication was successful. They stay in memory for 15 seconds (default write timeout) and as the rate of write increases, the impact on memory usage increases.
Instead, cancel the corresponding write timeout task when the write event is committed to the log. This also applies to complete transaction events.
Reviewers: David Jacot <djacot@confluent.io>
The patch implements JoinGroup API for the new consumer groups. It allow members using the classic rebalance protocol with the consumer embedded protocol to join a new consumer group.
Reviewers: David Jacot <djacot@confluent.io>
ConsumerGroupPartitionMetadataValue.Epoch is not used anywhere so we can remove it. Note that we already have non-backward compatible changes lined up for 3.8 so it is fine to do it.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
While reviewing https://github.com/apache/kafka/pull/15785, I noticed that the member is added to the group directly in `ConsumerGroup#getOrMaybeCreateMember`. This does not hurt but confuses people because the state must not be mutated at this point. It should only be mutated when records are replayed. I think that it is better to remove it in order to make it clear.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
We are seeing flaky test in `testConsumerGroupHeartbeatWithStableClassicGroup` where the error is caused by the different ordering in the expected and actual values. The patch sorts the topic partition list in the records to fix the issue.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Igor Soarez <soarez@apple.com>, David Jacot <djacot@confluent.io>
Online downgrade from a consumer group to a classic group is triggered when the last consumer that uses the consumer protocol leaves the group. A rebalance is manually triggered after the group conversion. This patch adds consumer group downgrade validation and conversion.
Reviewers: David Jacot <djacot@confluent.io>
This patch introduces the conversion from a classic group to a consumer group when a member joins with the new consumer group protocol (epoch is 0) but only if the conversion is enabled.
Reviewers: David Jacot <djacot@confluent.io>
This patch adds the `group.consumer.migration.policy` config which controls how consumer groups can be converted from classic group to consumer group and vice versa. The config is kept as an internal one while we develop the feature.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
When implementing the group type conversion from a classic group to a consumer group, if the replay of conversion records fails, the group should be reverted back including its timeouts.
A possible solution is to keep all the classic group timeouts and add a type check to the timeout operations. If the group is successfully upgraded, it won't be able to pass the type check and its operations will be executed without actually doing anything; if the group upgrade fails, the group map will be reverted and the timeout operations will be executed as is.
We've already have group type check in consumer group timeout operations. This patch adds similar type check to those classic group timeout operations.
Reviewers: David Jacot <djacot@confluent.io>
- Use `Empty` instead of 'none' when referring to `Optional` values.
- `Headers.lastHeader` returns `null` when no header is found.
- Fix minor spelling mistakes.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
KIP-890 Part 1 introduced verification of transactions with the
transaction coordinator on the `Produce` and `TxnOffsetCommit` paths.
This introduced the possibility of new errors when responding to those
requests. For backwards compatibility with older clients, a choice was
made to convert some of the new retriable errors to existing errors that
are expected and retried correctly by older clients.
`NETWORK_EXCEPTION` was forgotten about and not converted, but can occur
if, for example, the transaction coordinator is temporarily refusing
connections. Now, we convert it to:
* `NOT_ENOUGH_REPLICAS` on the `Produce` path, just like the other
retriable errors that can arise from transaction verification.
* `COORDINATOR_LOAD_IN_PROGRESS` on the `TxnOffsetCommit` path. This
error does not force coordinator lookup on clients, unlike
`COORDINATOR_NOT_AVAILABLE`. Note that this deviates from KIP-890,
which says that retriable errors should be converted to
`COORDINATOR_NOT_AVAILABLE`.
Reviewers: Artem Livshits <alivshits@confluent.io>, David Jacot <djacot@confluent.io>, Justine Olshan <jolshan@confluent.io>
When the group coordinator is under heavy load, the current mechanism to release pending events based on updated high watermark, which consist in pushing an event at the end of the queue, is bad because pending events pay the cost of the queue twice. A first time for the handling of the first event and a second time for the handling of the hwm update. This patch changes this logic to push the hwm update event to the front of the queue in order to release pending events as soon as as possible.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
As a part of KIP-890, we are introducing a new class of Exceptions which when encountered shall lead to Aborting the ongoing Transaction. The following PR introduces the same with client side handling and server side changes.
On client Side, the code attempts to handle the exception as an Abortable error and ensure that it doesn't take the producer to a fatal state. For each of the Transactional APIs, we have added the appropriate handling. For the produce request, we have verified that the exception transitions the state to Aborted.
On the server side, we have bumped the ProduceRequest, ProduceResponse, TxnOffestCommitRequest and TxnOffsetCommitResponse Version. The appropriate handling on the server side has been added to ensure that the new error case is sent back only for the new clients. The older clients will continue to get the old Invalid_txn_state exception to maintain backward compatibility.
Reviewers: Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch enables an empty classic group to be automatically converted to a new consumer group and vice versa.
Reviewers: David Jacot <djacot@confluent.io>
This patch fixes a bug in the logic which decides when a full ConsumerGroupHeartbeat response must be returned to the client. Prior to it, the logic only relies on the `ownedTopicPartitions` field to check whether the response was a full response. This is not enough because `ownedTopicPartitions` is also set in different situations. This patch changes the logic to check `ownedTopicPartitions`, `subscribedTopicNames` and `rebalanceTimeoutMs` as they are the only three non optional fields.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
In https://github.com/apache/kafka/pull/15364, we introduced, thoughtfully, a non-backward compatible record change for the new consumer group protocol. So it is a good opportunity for cleaning unused fields, mainly related to the client side assignor logic which is not implemented yet. It is better to introduce them when we need them and more importantly when we implement it.
Note that starting from 3.8, we won't make such changes anymore. Non-backward compatible changes are still acceptable now because we clearly said that upgrade won't be supported from the KIP-848 EA.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
The patch prevents the main method (the method generating records) from modifying the timeline data structure `groups` by calling `getOrMaybeCreateConsumerGroup` in kip-848 new group coordinator. Only replay methods are able to add the newly created group to `groups`.
Reviewers: David Jacot <djacot@confluent.io>
This patch re-work the reconciliation state machine on the server side with the goal to fix a few issues that we have recently discovered.
* When a member acknowledges the revocation of partitions (by not reporting them in the heartbeat), the current implementation may miss it. The issue is that the current implementation re-compute the assignment of a member whenever there is a new target assignment installed. When it happens, it does not consider the reported owned partitions at all. As the member is supposed to only report its own partitions when they change, the member is stuck.
* Similarly, as the current assignment is re-computed whenever there is a new target assignment, the rebalance timeout, as it is currently implemented, becomes useless. The issue is that the rebalance timeout is reset whenever the member enters the revocation state. In other words, in the current implementation, the timer is reset when there are no target available even if the previous revocation is not completed yet.
The patch fixes these two issues by not automatically recomputing the assignment of a member when a new target assignment is available. When the member must revoke partitions, the coordinator waits. Otherwise, it recomputes the next assignment. In other words, revoking is really blocking now.
The patch also proposes to include an explicit state in the record. It makes the implementation cleaner and it also makes it more extensible in the future.
The patch also changes the record format. This is a non-backward compatible change. I think that we should do this change to cleanup the record. As KIP-848 is only in early access in 3.7 and that we clearly state that we don't plane to support upgrade from it, this is acceptable in my opinion.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
There are a few cases in the group coordinator service where we want to read from or write to each of the known coordinators (each of __consumer_offsets partitions). The current implementation needs to get the list of the known coordinators then schedules the operation and finally aggregate the results. This patch is an attempt to streamline this by adding multi read/write to the runtime.
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
I have noticed the following log when a __consumer_offsets partition immigrate from a broker. It appends because the event is queued up after the event that unloads the state machine. This patch fixes it and fixes another similar one.
```
[2024-02-06 17:14:51,359] ERROR [GroupCoordinator id=1] Execution of UpdateImage(tp=__consumer_offsets-28, offset=13251) failed due to This is not the correct coordinator.. (org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime)
org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.
```
Reviewers: Justine Olshan <jolshan@confluent.io>
`poll(long timeout, TimeUnit unit)` is either used with `Long.MAX_VALUE` or `0`. This patch replaces it with `poll` and `take`. It removes the `awaitNanos` usage.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
Spotted the following log:
```
[2024-02-14 09:59:30,103] INFO [GroupCoordinator id=1] Finished loading of metadata from 39 in __consumer_offsets-4ms with epoch 2 where 39ms was spent in the scheduler. Loaded 0 records which total to 0 bytes. (org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime)
```
The partition and the time are incorrect. This patch fixes it.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Bruno Cadonna <bruno@confluent.io>
`GroupMetadataManagerTest` class got a little under control. We have too many things defined in it. As a first steps, this patch extracts all the inner classes. It also extracts all the helper methods. However, the logic is not changed at all.
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Justine Olshan <jolshan@confluent.io>
`signalAll` was mistakenly used instead of `signal` when a key become available in the `EventAccumulator`. The fix relies on existing tests.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch adds the support for filtering groups by types (Classic or Consumer) to both the old and the new group coordinators.
Reviewers: David Jacot <djacot@confluent.io>
This patch implements `GroupCoordinator.onPartitionsDeleted` that is called whenever a partition is deleted and must deleted all the offsets related to them. The patch uses a naive approach similar to the one used in the old coordinator. It basically iterates over all the regular end pending offsets and deletes the ones matching the deleted partition set.
Reviewers: Justine Olshan <jolshan@confluent.io>
This patch uniformizes the error handling in the GroupCoordinatorService with the aim to reuse the same error translation for all operations. It also ensures that exceptions are unwrapped if needed.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
While using —list —state the current accepted values correspond to the classic group type states. This patch adds the new states introduced by KIP-848. It also make the matching on the server case insensitive.
Co-authored-by: d00791190 <dinglan6@huawei.com>
Reviewers: Ritika Reddy <rreddy@confluent.io>, David Jacot <djacot@confluent.io>
This is the last patch to complete the implementation of the transactional offsets. This patch updates the following paths:
* delete offsets - the patch ensures that a tombstone is written for pending transactional offsets too.
* delete all offsets - the patch ensures that all pending transactional offsets are deleted too.
* expire offsets - the patch ensures that an offset for a partition is not expire is there is a pending transaction.
* replay offset record - the patch ensures that all pending transactional offsets are removed when a tombstone is received.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Dongnuo Lyu <dlyu@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
The patch populates the topic name of `ConsumerGroupDescribeResponseData.TopicPartitions` with the corresponding topic id in `ConsumerGroupDescribe`.
Reviewers: David Jacot <djacot@confluent.io>
KIP-714 requires client instance cache in broker which should also have a time-based eviction policy where client instances which are not actively sending metrics should be evicted. KIP mentions This client instance specific state is maintained in broker memory up to MAX(60*1000, PushIntervalMs * 3) milliseconds.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>
When transactional offset commits are eventually committed, we must always keep the most recent committed when we have a mix of transactional and regular offset commits. We achieve this by storing the offset of the offset commit record along side the committed offset in memory. Without preserving information of the commit record offset, compaction of the __consumer_offsets topic itself may result in the wrong offset commit being materialized.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ismael Juma <ismael@juma.me.uk> , David Jacot <djacot@confluent.io>, Nikolay <NIzhikov@gmail.com>
We had a case where a partition got assigned to two members and we found a bug in the partition epochs bookkeeping. Basically, when a member has a partition pending revocation re-assigned to him before the revocation is completed, the partition epoch is lost. Here is an example of such transition:
```
[2024-01-16 12:10:52,613] INFO [GroupCoordinator id=1 topic=__consumer_offsets partition=7] [GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member M2 transitioned from CurrentAssignment(memberEpoch=11, previousMemberEpoch=9, targetMemberEpoch=14, state=revoking, assignedPartitions={}, partitionsPendingRevocation={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 3, 4, 5, 6, 7]}, partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[0, 5]}) to CurrentAssignment(memberEpoch=15, previousMemberEpoch=11, targetMemberEpoch=15, state=stable, assignedPartitions={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 3, 4, 5, 6, 7]}, partitionsPendingRevocation={}, partitionsPendingAssignment={}). (org.apache.kafka.coordinator.group.GroupMetadataManager)
```
This patch fixes the bug and also strengthen the partition epochs bookkeeping to not accept such invalid transitions.
Reviewers: Justine Olshan <jolshan@confluent.io>
The current load summary exposes the time from when the partition load operation is scheduled to when the load completes. We are missing the information of how long the scheduled operation stays in the scheduler. Log that information.
Reviewers: David Jacot <djacot@confluent.io>
This PR creates MetadataVersion.latestTesting to represent the highest metadata version (which may be unstable) and MetadataVersion.latestProduction to represent the latest version that should be used in production. It fixes a few cases where the broker was advertising that it supported the testing versions even when unstable metadata versions had not been configured.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
When a replica is deleted, the unloading procedure of the coordinator is called with an empty leader epoch. However, the current implementation of the new group coordinator throws an exception in this case. My bad. This patch updates the logic to handle it correctly.
We discovered the bug in our testing environment. We will add a system test or an integration test in a subsequent patch to better exercise this path.
Reviewers: Justine Olshan <jolshan@confluent.io>
This patch adds `UNSTABLE_OFFSET_COMMIT` errors support in the new group coordinator. `UNSTABLE_OFFSET_COMMIT` errors for partitions with unstable offset commits. Here unstable means that there are ongoing transactions.
Reviewers: Justine Olshan <jolshan@confluent.io>
This patch wires the transaction verification in the new group coordinator. It basically calls the verification path before scheduling the write operation. If the verification fails, the error is returned to the caller.
Note that the patch uses `appendForGroup`. I suppose that we will move away from using it when https://github.com/apache/kafka/pull/15087 is merged.
Reviewers: Justine Olshan <jolshan@confluent.io>
When a retryable write operation fails, we retry with the default 500ms backoff. If a custom retry backoff was used to originally schedule the operation, we should retry with the same custom backoff instead of the default.
Reviewers: David Jacot <djacot@confluent.io>
Remove "group-rebalance-rate" and "group-rebalance-count" metrics from the new coordinator as this is not part of KIP-848.
Reviewers: David Jacot <djacot@confluent.io>
People has raised concerned about using `Generic` as a name to designate the old rebalance protocol. We considered using `Legacy` but discarded it because there are still applications, such as Connect, using the old protocol. We settled on using `Classic` for the `Classic Rebalance Protocol`.
The changes in this patch are extremely mechanical. It basically replaces the occurrences of `generic` by `classic`.
Reviewers: Divij Vaidya <diviv@amazon.com>, Lucas Brutschy <lbrutschy@confluent.io>