Commit Graph

442 Commits

Author SHA1 Message Date
Andrew Schofield d4d9f11816
KAFKA-18761: [2/N] List share group offsets with state and auth (#19328)
This PR approaches completion of Admin.listShareGroupOffsets() and
kafka-share-groups.sh --describe --offsets.

Prior to this patch, kafka-share-groups.sh was only able to describe the
offsets for partitions which were assigned to active members. Now, the
Admin.listShareGroupOffsets() uses the persister's knowledge of the
share-partitions which have initialised state. Then, it uses this list
to obtain a complete set of offset information.

The PR also implements the topic-based authorisation checking. If
Admin.listShareGroupOffsets() is called with a list of topic-partitions
specified, the authz checking is performed on the supplied list,
returning errors for any topics to which the client is not authorised.
If Admin.listShareGroupOffsets() is called without a list of
topic-partitions specified, the list of topics is discovered from the
persister as described above, and then the response is filtered down to
only show the topics to which the client is authorised. This is
consistent with other similar RPCs in the Kafka protocol, such as
OffsetFetch.

Reviewers: David Arthur <mumrah@gmail.com>, Sushant Mahajan <smahajan@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
2025-04-04 13:25:19 +01:00
Chirag Wadhwa c3f0890f53
KAFKA-19075: Included other share group dynamic configs in extractShareGroupConfigMap method in ShareGroupConfig (#19348)
This PR includes `share.session.timeout.ms` and
`share.heartbeat.interval.ms` in the `extractShareGroupConfigMap` method
in `ShareGroupConfig`. With this change, the default value of
`share.session.timeout.ms` and `share.heartbeat.interval.ms` for every
group will be set as the value of the static configs
`group.share.session.timeout.ms` and `group.share.heartbeat.interval.ms`
respectively

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-04-03 13:47:25 +01:00
Sushant Mahajan 37f7434eac
KAFKA-18827: Incorporate initializing topics in share group heartbeat [4/N] (#19339)
* Currently, when we get a heartbeat with new share partition
subscriptions, we return an initialize request to the caller which the
caller executes asynchronously as a timer task.
* Meanwhile, if a new heartbeat request comes with same or null
subscription - the same initialize request will be returned since the
`GroupMetadataManager` has no idea about the older in flight request.
* In this PR, we have added a new field to the
`ShareGroupStatePartitionMetadata` record `initializingTopics` where
this information can be recorded in the GMM. Consequently, the
subsequent heartbeats can check this field and not return duplicate
initialize requests.
* If any errors are encountered while initializing by the
`GroupCoordinatorService` an additional method
`uninitializeShareGroupState` has been added which will remove the
requisite info from the `initializingFields`.
* New tests have been added wherever applicable and older ones updated.

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-04-03 09:38:20 +01:00
Chirag Wadhwa 793b47011c
KAFKA-19016: Updated the retention behaviour of share groups to retain them forever (#19343)
This PR makes the required changes for retaining a share group forever
post creation. A new field `shouldExpire` is added to all types
implementing the interface `Group`. It returns false only in case of
`ShareGroup` and true otherwise. A check is also added in
`cleanupGroupMetadata` in `GroupCoordinatorShard` and it proceeds with
group expiration only if `shouldExpire` returns true

Reviewers: Andrew Schofield <aschofield@confluent.io>, PoAn Yang <payang@apache.org>
2025-04-02 11:36:04 +01:00
Apoorv Mittal 4aa81204ff
KAFKA-19018,KAFKA-19063: Implement maxRecords and acquisition lock timeout in share fetch request and response resp. (#19334)
PR add `MaxRecords` to share fetch request and also adds
`AcquisitionLockTimeout` to share fetch response. PR also removes
internal broker config of `max.fetch.records`.

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-04-01 12:23:06 +01:00
Lucas Brutschy e3809682e2
KAFKA-18613: Unit tests for usage of incorrect RPCs (#18383)
In the GroupMetadataManager, we may call an RPC on an incorrect group
type. This adds unit tests to validate the behavior when an RPC is used
on an incorrect group type.

Reviewers: Bill Bejeck <bill@confluent.io>
2025-04-01 08:59:08 +02:00
Lucas Brutschy 6f4d4255a6
KAFKA-18613: Improve test coverage for missing topics (#19189)
Tests for joining with missing source topics,
internal topics, incorrectly partitioned topics,
and stale topologies.

Reviewers: Bill Bejeck <bill@confluent.io>
2025-03-31 21:46:58 +02:00
Lucas Brutschy 6d72677eda
KAFKA-18613: Add StreamsGroupHeartbeat handler in the group coordinator (#19114)
Basic streams group heartbeat handling. The main part of are the unit
tests that make sure that we behave, for the most part, like a consumer
group.

- No support for static membership
- No support for configurations (using constants instead)
- No support for regular expressions

Reviewers: Bill Bejeck <bill@confluent.io>, Bruno Cadonna
<cadonna@apache.org>
2025-03-31 13:21:06 +02:00
Sushant Mahajan eb88e78373
KAFKA-18827: Initialize share group state group coordinator impl. [3/N] (#19026)
* This PR adds impl for the initialize share groups call from the Group
Coordinator perspective.
* The initialize call on persister instance will be invoked by the
`GroupCoordinatorService`, based on the response of the
`GroupCoordinatorShard.shareGroupHeartbeat`. If there is new topic
subscription or member assignment change (topic paritions incremented),
the delta share partitions corresponding to the share group in question
are returned as an optional initialize request.
* The request is then sent to the share coordinator as an encapsulated
timer task because we want the heartbeat response to go asynchronously.
* Tests have been added for `GroupCoordinatorService` and
`GroupMetadataManager`. Existing tests have also been updated.
* A new formatter `ShareGroupStatePartitionMetadataFormatter` has been
added for debugging.

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-03-26 19:40:23 +00:00
Nick Guo 5d2bfb4151
KAFKA-18980 OffsetMetadataManager#cleanupExpiredOffsets should record the number of records rather than topic partitions (#19207)
jira: https://issues.apache.org/jira/browse/KAFKA-18980

We should use the number of `records` rather than topic partitions,but
this is not a bug as the number of `records` should be equal to the
number of topic partitions.

Also, we previously used `expiredPartitions` for logging, but now it is
not being used anymore, so we can remove it.

Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
2025-03-22 19:44:09 +08:00
David Jacot ca20e9cd92
KAFKA-18329; [3/3] Delete old group coordinator (KIP-848) (#19255)
This patch is the third of a series of patches to remove the old group
coordinator. With the release of Apache Kafka 4.0, the so-called new
group coordinator is the default and only option available now.

It removes the old group coordinator and cleans up the
`GroupCoordinator` interface.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2025-03-21 08:07:42 -07:00
David Jacot 0c5e5c5d2d
KAFKA-18329; [2/3] Delete old group coordinator (KIP-848) (#19251)
This patch is the second of a series of patches to remove the old group
coordinator. With the release of Apache Kafka 4.0, the so-called new
group coordinator is the default and only option available now.

The patch removes `group.coordinator.new.enable` (internal config) and
all its usages (integration tests, unit tests, etc.). It also cleans up
`KafkaApis` to remove logic only used by the old group coordinator.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2025-03-21 05:54:41 -07:00
ClarkChen ec3c319c35
MINOR: Rewrite unchecked operations in Mock API (#19071)
We encountered unchecked or unsafe operations in
`GroupMetadataManagerTest.java`, `KTableImplTest.java`, and
`ConfigCommandIntegrationTest.java`.

* Rewrite getArgument of invocation in InvocationOnMock API because the
implementation of InvocationOnMock discards type anyway in in
`GroupMetadataManagerTest.java`.

* Remove unchecked annotations for using mock API without variable
assignment in `KTableImplTest.java`.

<img width="1422" alt="Screenshot 2025-03-02 at 8 50 55 AM"
src="https://github.com/user-attachments/assets/10ff1799-ebaa-499c-9acd-ca3b30484e6d"
/>
  
* Follow-up: https://github.com/mockito/mockito/issues/1609

Update on March 2.
* Fix unchecked cast for KTableImpl in `KTableImplTest.java`.
<img width="1259" alt="Screenshot 2025-03-02 at 5 17 47 PM"
src="https://github.com/user-attachments/assets/a5ffa3d7-4897-43ee-9b5f-26337e2560c5"
/>

Update on March 10.
* Use anyMap instead any for unchecked map type issues.
<img width="1691" alt="Screenshot 2025-03-10 at 9 36 38 AM"
src="https://github.com/user-attachments/assets/9aabc595-e7ba-4e04-81f6-f238d42af5a6"
/>

Pass all testing.
<img width="946" alt="Screenshot 2025-03-10 at 10 10 56 AM"
src="https://github.com/user-attachments/assets/793f67ea-09dc-44af-9d6c-de15531e9e72"
/>

Reviewers: TengYao Chi <kitingiao@gmail.com>, Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-03-18 00:47:34 +08:00
Alieh Saeedi ff785ac251
KAFKA-18651: Add Streams-specific broker configurations (#19176)
This change implements the broker-side configs proposed in KIP-1071.
The configurations implemented by this PR are only those that were specifically aimed to be included in `AK 4.1`.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2025-03-13 18:05:24 +01:00
Andrew Schofield 278a93c45d
KAFKA-18901: [1/N] Improved homogeneous SimpleAssignor (#19142)
The current homogeneous SimpleAssignor for share groups is not very good
at revoking partitions which have previously been assigned when the
number of members increases. This PR improves the situation.

It also fixes the sorting of assignments in `kafka-consumer-groups.sh`
and `kafka-share-groups.sh` so that it sorts partition indices
numerically instead of alphabetically. It also adds the missing number
of partitions column for share groups.
2025-03-11 10:08:31 +00:00
Chirag Wadhwa ef2941154e
KAFKA-18931: added a share group session timeout task when group coordinator is loaded (#19173)
This PR adds `scheduleShareGroupSessionTimeout` for all the persisted
members of a share group when the group coordinator is loaded.

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-03-10 15:43:30 +00:00
Lucas Brutschy fc2e3dfce9
MINOR: Disallow unused local variables (#18963)
Recently, we found a regression that could have been detected by static
analysis, since a local variable wasn't being passed to a method during
a refactoring, and was left unused. It was fixed in
[7a749b5](7a749b589f),
but almost slipped into 4.0. Unused variables are typically detected by
IDEs, but this is insufficient to prevent these kinds of bugs. This
change enables unused local variable detection in checkstyle for Kafka.

A few notes on the usage:
- There are two situations in which people actually want to have a local
variable but not use it. First, there are `for (Type ignored:
collection)` loops which have to loop `collection.length` number of
times, but that do not use `ignored` in the loop body. These are
typically still easier to read than a classical `for` loop. Second, some
IDEs detect it if a return value of a function such as `File.delete` is
not being used. In this case, people sometimes store the result in an
unused local variable to make ignoring the return value explicit and to
avoid the squiggly lines.
- In Java 22, unsued local variables can be omitted by using a single
underscore `_`. This is supported by checkstyle. In pre-22 versions,
IntelliJ allows such variables to be named `ignored` to suppress the
unused local variable warning. This pattern is often (but not
consistently) used in the Kafka codebase. This is, however, not
supported by checkstyle.

Since we cannot switch to Java 22, yet, and we want to use automated
detection using checkstyle, we have to resort to prefixing the unused
local variables with `@SuppressWarnings("UnusedLocalVariable")`. We have
to apply this in 11 cases across the Kafka codebase. While not being
pretty, I'd argue it's worth it to prevent bugs like the one fixed in
[7a749b5](7a749b589f).

Reviewers: Andrew Schofield <aschofield@confluent.io>, David Arthur
<mumrah@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Bruno
Cadonna <cadonna@apache.org>, Kirk True <ktrue@confluent.io>
2025-03-10 09:37:35 +01:00
co63oc 3d7ac0c3d1
MINOR: Fix typos in multiple files (#19102)
Fix typos in multiple files

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-03-05 14:27:32 +00:00
David Jacot 08a52c59c1
MINOR: Refactor GroupCoordinatorConfig (#19092)
We defined multiple `ConfigDef`s in `GroupCoordinatorConfig` in then we
merge them in a few places because we always use them together. Having
multiple `ConfigDef`s does not seem necessary to me. This patch changes
it to have just one.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-03-05 01:03:59 -08:00
David Jacot c2014c02b1
MINOR: Small refactor in GroupMetadataManager (#19090)
The code in GroupMetadataManager to request metadata refresh got pretty
ugly with the addition of share and stream groups. It seems preferable
to put the method in the base class.

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-03-04 07:07:18 -08:00
David Jacot 1df4a42b40
KAFKA-18916; Resolved regular expressions must update the group by topics data structure (#19088)
When regular expressions are resolved, they do not update the group by
topics data structure. Hence, topic changes (e.g. deletion) do not
trigger a rebalance of the group.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2025-03-04 06:31:08 -08:00
David Jacot 29115d5931
MINOR: Don't print cleaning group metadata log if empty (#19080)
The new group coordinator prints the following line at fixed interval
even if no groups were deleted:

```
Generated 0 tombstone records while cleaning up group metadata in 0 milliseconds. (org.apache.kafka.coordinator.group.GroupCoordinatorShard)
```

The time component has some value in its own but it may be better to not
print if when there are not records in order to reduce the spam in the
logs.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-03-03 18:10:17 +01:00
Sanskar Jhajharia 3c73c9bdc1
MINOR: Clean up group-coordinator (#19008)
Given that now we support Java 17 on our brokers, this PR replace the
use of :
- `Collections.singletonList()` and `Collections.emptyList()` with
`List.of()`
- `Collections.singletonMap()` and `Collections.emptyMap()` with
`Map.of()`
- `Collections.singleton()` and `Collections.emptySet()` with `Set.of()`

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-03-02 10:45:44 +08:00
Dongnuo Lyu 36f19057e1
KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe (#18989)
This patch filters out the topic describe unauthorized topics from the
ConsumerGroupHeartbeat and ConsumerGroupDescribe response.

In ConsumerGroupHeartbeat, 
- if the request has `subscribedTopicNames` set, we directly check the
authz in `KafkaApis` and return a topic auth failure in the response if
any of the topics is denied.
- Otherwise, we check the authz only if a regex refresh is triggered and
we do it based on the acl of the consumer that triggered the refresh. If
any of the topic is denied, we filter it out from the resolved
subscription.

In ConsumerGroupDescribe, we check the authz of the coordinator
response. If any of the topic in the group is denied, we remove the
described info and add a topic auth failure to the described group.
(similar to the group auth failure)

Reviewers: David Jacot <djacot@confluent.io>, Lianet Magrans
<lmagrans@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>,
Chia-Ping Tsai <chia7712@gmail.com>, TaiJuWu <tjwu1217@gmail.com>,
TengYao Chi <kitingiao@gmail.com>
2025-02-26 13:05:36 -05:00
Lucas Brutschy cb7c54ccd3
KAFKA-18614, KAFKA-18613: Add streams group request plumbing (#18979)
This change implements the basic RPC handling StreamsGroupHeartbeat and
StreamsGroupDescribe. This includes:
 - Adding an option to enable streams groups on the broker
- Passing describe and heartbeats to the right shard of the group
coordinator
- The handler inside the GroupMetadatManager for StreamsGroupDescribe is
fairly trivial, and is included directly in this PR.
- The handler for StreamsGroupHeartbeat is complex and not included in
this PR yet. Instead, a UnsupportedOperationException is thrown.
However, the interface is already defined: The result of a
streamsGroupHeartbeat is a response, together with a list of internal
topics to be created.

The heartbeat implementation inside the `GroupMetadataManager`, which
actually implements the assignment / reconciliation logic, will come in
a follow-up PR. Also, automatic creation of internal topics will be
created in a follow-up PR.

Reviewers: Bill Bejeck <bill@confluent.io>
2025-02-26 16:33:26 +01:00
Abhinav Dixit 4b5a16bf6f
KAFKA-18757: Create full-function SimpleAssignor to match KIP-932 description (#18864)
### About
The current `SimpleAssignor` in AK assigned all subscribed topic
partitions to all the share group members. This does not match the
description given in
[KIP-932](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255070434#KIP932:QueuesforKafka-TheSimpleAssignor).
Here are the rules as mentioned in the KIP by which the assignment
should happen. We have changed the step 3 implementation here due to the
reasons
[described](https://github.com/apache/kafka/pull/18864#issuecomment-2659266502)
-

1. The assignor hashes the member IDs of the members and maps the
partitions assigned to the members based on the hash. This gives
approximately even balance.
2. If any partitions were not assigned any members by (1) and do not
have members already assigned in the current assignment, members are
assigned round-robin until each partition has at least one member
assigned to it.
3. We combine the current and new assignment. (Original rule - If any
partitions were assigned members by (1) and also have members in the
current assignment assigned by (2), the members assigned by (2) are
removed.)

### Tests
The added code has been verified with unit tests and the already present
integration tests.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, TaiJuWu <tjwu1217@gmail.com>
2025-02-26 11:02:23 +00:00
Sanskar Jhajharia b2f0d92c45
[MINOR] Fix the docs for share group metric functions (#19023)
The last commit in this class mistakenly described the functions to be
for Streams Groups. Just a minor update.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Sushant Mahajan <smahajan@confluent.io>
2025-02-25 10:16:00 +00:00
David Jacot 2124511431
MINOR: Rearrange configs in GroupCoordinatorConfigs (#18970)
I was looking into GroupCoordinatorConfigs to review configurations that
we will ship with Apache Kafka 4.0. I found out that it was pretty
disorganised. This patch cleans up the format and re-groups the
configurations which are related.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-02-21 13:20:58 +01:00
Sushant Mahajan c2cb543a1e
KAFKA-18629: Delete share group state RPC group coordinator impl. [3/N] (#18848)
* In this PR, we have added GC side impl to call the delete state share
coord RPC using the persister.
* We will be using the existing `GroupCoordinatorService.deleteGroups`.
The logic will be modified as follows:
* After sanitization, we will call a new
`runtime.scheduleWriteOperation` (not read for consistency) with
callback `GroupCoordinatorShard.sharePartitions`. This will return a Map
of share partitions of the groups which are of SHARE type. We need to
pass all groups as WE CANNOT DETERMINE the type of the group in the
service class.
* Then using the map we will create requests which could be passed to
the persister and make the appropriate calls.
* Once this future completes, we will continue with the existing flow of
group deletion.
* If the group under inspection is not share group - the read callback
should return an empty map.
* Tests have been added wherever applicable.

Reviewers: David Jacot <djacot@confluent.io>, Andrew Schofield <aschofield@confluent.io>
2025-02-21 12:13:16 +00:00
Sushant Mahajan c89fd2bff6
KAFKA-18828: Update share group metrics per new init and call mechanism. (#18962)
* Due to recent changes in the way group count metrics are initialized
and updated, the current share group count code has become obsolete as
well as non-functional.
* The update method for the share group count which should be called
from `ShareGroup` cannot be called either. This is because the
constructor has been changed to NOT accept the
`GroupCoordinatorShardMetrics` ref.
* In this PR, we remedy the situation by bringing share group count code
at par with consumer and streams groups.
* Additionally the metric name for share groups with group state
attributes was not aligned with streams and consumer groups as mentioned
in https://github.com/apache/kafka/pull/17011#discussion_r1960309578.
This PR aligns them too.

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-02-20 10:23:37 +00:00
Matthias J. Sax 538a60e1b3
MINOR: disallow rawtypes and fail build (#18877)
Cleanup code to avoid rawtype, and add suppressions where necessary.
Change the build to fail on rawtype warning.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
2025-02-19 13:11:49 -08:00
David Jacot d847d5c464
MINOR: Tweak default group coordinator config & upgrade notes (#18948)
This patch changes the default value of `group.coordinator.threads` to `4` and sets it priority to `HIGH`. This change makes it consistent with how we handle `num.network.threads` and `num.io.threads`. The patch also tweaks the upgrade notes.

Reviewers: Ismael Juma <ismael@juma.me.uk>
2025-02-18 20:05:52 +01:00
Parker Chang ed366e6b89
MINOR: Align assertFutureThrows method signature with JUnit conventions (#18825)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Andrew Schofield <aschofield@confluent.io>
2025-02-18 15:56:42 +00:00
Andrew Schofield 6c14f64245
MINOR: Rename NoOpShareStatePersister for consistency (#18933)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-02-18 14:07:59 +00:00
Lucas Brutschy d0c65a1fd2
KAFKA-18730: Add replaying streams group state from offset topic (#18809)
Adds streams group to the GroupMetadataManager, and implements loading
the records from the offset topic into state. The state also contains
two timers (rebalance timeout and session timeout) that are started
after the group coordinator has been loaded.

Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bill@confluent.io>
2025-02-17 16:13:21 +01:00
Andrew Schofield 952113e8e0
KAFKA-16720: Support multiple groups in DescribeShareGroupOffsets RPC (#18834)
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
2025-02-13 18:27:05 +00:00
Lucas Brutschy c70b7c4b9e
KAFKA-18323: Add StreamsGroup class (#18729)
Implements a memory model for representing streams groups in the group coordinator, as well as group count and rebalance metrics.

Reviewers: Bill Bejeck <bill@confluent.io>, Bruno Cadonna <bruno@confluent.io>
2025-02-12 11:01:53 +01:00
David Jacot be89ce5f61
MINOR: Accept specifying consumer group assignors by their short names (#18832)
At the moment, we require specifying builtin server side assignors by their full class name. This is not convenient and also exposed their full class name as part of our public API. This patch changes it to accept specifying builtin server side assignor by their short name (uniform or range) while continuing to accept full class name for customer assignors.

Reviewers: Jeff Kim <jeff.kim@confluent.io>
2025-02-10 02:58:44 -08:00
Sanskar Jhajharia 7dbed2f6e8
[KAFKA-16720] AdminClient Support for ListShareGroupOffsets (2/2) (#18671)
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Sushant Mahajan <smahajan@confluent.io>, Andrew Schofield <aschofield@confluent.io>
2025-02-05 14:38:09 +00:00
Lucas Brutschy 4ca24a7dbf
KAFKA-18325: Add TargetAssignmentBuilder (#18676)
A class to build a new target assignment based on the provided parameters. As a result, it yields the records that must be persisted to the log and the new member assignments as a map.

Compared to the feature branch, I extended the unit tests (testing also standby and warm-up task logic) and adopted simplifications due to the TasksTuple class.

Reviewers: Bruno Cadonna <cadonna@apache.org>, Bill Bejeck <bbejeck@apache.org>
2025-02-03 17:35:28 +01:00
Dongnuo Lyu 1a106e4538
KAFKA-18655: Implement the consumer group size counter with scheduled task (#18717)
During testing we discovered that the empty group count is not updated in group conversion, but when the new group is transition to other state, the empty group count is decremented. This could result in negative empty group count.

We can have a new consumer group count implementation that follows the pattern we did for the classic group count. The timeout task periodically refreshes the metrics based on the current groups soft state.

Reviewers: Jeff Kim <jeff.kim@confluent.io>
2025-02-03 10:50:21 -05:00
David Jacot bf05d2c914
KAFKA-18672; CoordinatorRecordSerde must validate value version (#18749)
CoordinatorRecordSerde does not validate the version of the value to check whether the version is supported by the current version of the software. This is problematic if a future and unsupported version of the record is read by an older version of the software because it would misinterpret the bytes. Hence CoordinatorRecordSerde must throw an error if the version is unknown. This is also consistent with the handling in the old coordinator.

Reviewers: Jeff Kim <jeff.kim@confluent.io>
2025-02-03 02:19:27 -08:00
Alieh Saeedi eb01221dc0
KAFKA-17125: Streams Sticky Task Assignor (#18652)
Implements streams sticky assignor on the broker-side.

Reviewers: Bill Bejeck <bbejeck@apache.org>, Lucas Brutschy <lbrutschy@confluent.io>
2025-02-03 10:43:26 +01:00
David Jacot d19b605210
KAFKA-18320; Ensure that assignors are at the right place (#18750)
The full class name of the assignors if part of our public api. Hence, we should ensure that they are not changed by mistake. This patch adds a unit test verifying them.

Reviewers: Sean Quah <squah@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
2025-01-31 07:51:28 -08:00
Lucas Brutschy 56e50120be
KAFKA-18621: Add StreamsCoordinatorRecordHelpers (#18669)
A class with helper methods to create records stored in the __consumer_offsets topic.

Compared to the feature branch, I added unit tests (most functions were not tested) and adopted the new interface for constructing coordinator records introduced by David.

Reviewers: Bruno Cadonna <cadonna@apache.org>
2025-01-30 09:28:45 +01:00
Lucas Brutschy aea699bdef
KAFKA-18324: Add CurrentAssignmentBuilder (#18476)
Implements the current assignment builder, analogous to the current assignment builder of consumer groups. The main difference is the underlying assigned resource, and slightly different logic around process IDs: We make sure to move a task only to a new client, once the task is not owned anymore by any client with the same process ID (sharing the same state directory) - in any role (active, standby or warm-up).

Compared to the feature branch, the main difference is that I refactored the separate treatment of active, standby and warm-up tasks into a compound datatype called TaskTuple (which is used in place of the more specific Assignment class). This also has effects on StreamsGroupMember.

Reviewers: Bruno Cadonna <cadonna@apache.org>, Bill Bejeck <bbejeck@apache.org>
2025-01-23 17:35:03 +01:00
Andrew Schofield 9da516b1a9
KAFKA-18392: Ensure client sets member ID for share group (#18649)
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Lianet Magrans <lmagrans@confluent.io>
2025-01-22 08:57:40 +00:00
David Jacot b368c38684
KAFKA-18302; Update CoordinatorRecord (#18512)
This patch does a few things:
1) Replace ApiMessageAndVersion by ApiMessage in CoordinatorRecord for the key
2) Leverage the fact that ApiMessage exposes the apiKey. Hence we don't need to specify the key anymore.

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-01-21 18:11:26 +01:00
Lucas Brutschy 7d39ba1ef8
KAFKA-18311: Internal Topic Manager (5/5) (#18442)
The internal topic manager takes a requested topology and returns a configured topology, as well as any internal topics that need to be created.

Shares with the client-side "InternalTopicManager" the name only.

Reviewers: Bruno Cadonna <cadonna@apache.org>
2025-01-21 09:42:07 +01:00
David Jacot 76bf38a4fd
KAFKA-18604; Update transaction coordinator (#18636)
This patch updates the transaction coordinator record to use the new coordinator record definition.

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-01-21 08:36:23 +01:00
Dongnuo Lyu 9026b6c6c0
KAFKA-18318: Add logs for online/offline migration indication (#18406)
Add some logs when offline/online happens.

Reviewers: David Jacot <djacot@confluent.io>
2025-01-17 05:30:32 -08:00
Sean Quah 65b93fa8d5
KAFKA-18150; Downgrade group on classic leave of last consumer member (#18224)
Reviewers: David Jacot <djacot@confluent.io>
2025-01-17 05:27:39 -08:00
Lianet Magrans a9688b9507
MINOR: Improve doc for config group.consumer.max.size (#18533)
Reviewers: David Jacot <david.jacot@gmail.com>
2025-01-14 21:36:14 +01:00
Ismael Juma d4aee71e36
KAFKA-18465: Remove MetadataVersions older than 3.0-IV1 (#18468)
Apache Kafka 4.0 will only support KRaft and 3.0-IV1 is the minimum version supported by KRaft. So, we can assume that Apache Kafka 4.0 will only communicate with brokers that are 3.0-IV1 or newer.

Note that KRaft was only marked as production-ready in 3.3, so we could go further and set the baseline to 3.3. I think we should have that discussion, but it made sense to start with the non controversial parts.

Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <david.jacot@gmail.com>
2025-01-11 09:42:39 -08:00
Lucas Brutschy b9a952df6c
KAFKA-18311: Enforcing copartitioned topics (4/N) (#18397)
A simplified port of "CopartitionedTopicsEnforcer" from the client-side to the group coordinator.

This class is responsible for enforcing the number of partitions in copartitioned topics. For each copartition group, it checks whether the number of partitions for all topics in the group is the same, and enforces the right number of partitions for repartition topics whose number of partitions is not enforced by the topology.

Compared to the client-side version, the implementation uses immutable data structures, and returns the computed number of partitions instead of modifying mutable data structures and calling the admin client.

Reviewers: Bruno Cadonna <cadonna@apache.org>
2025-01-10 12:31:09 +01:00
David Jacot 87334e6c2e
KAFKA-18308; Update CoordinatorSerde (#18455)
This patch updates the GroupCoordinatorSerde and the ShareGroupCoordinatorSerde to leverage the CoordinatorRecordType to deserialize records. With this, newly added record are automatically picked up. In other words, the serdes work with all defined records without doing anything.

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-01-10 11:17:30 +01:00
Sean Quah fc48b8166b
MINOR: Clean up classic group tests (#18473)
Use `List.of` and `Set.of` where possible.

Reviewers: David Jacot <djacot@confluent.io>
2025-01-09 23:04:45 -08:00
Lucas Brutschy 3bda9f817d
KAFKA-18311: Configuring repartition topics (3/N) (#18395)
A simplified port of "RepartitionTopics" from the client-side to the group coordinator.

Compared to the client-side version, the implementation uses immutable data structures, and returns the computed number of partitions instead of modifying mutable data structures and calling the admin client.

Reviewers: Bruno Cadonna <cadonna@apache.org>
2025-01-09 13:56:37 +01:00
Bruno Cadonna 11459ae7e9
KAFKA-18453: Add StreamsTopology class to group coordinator (#18446)
Adds a class that represent the topology of a Streams group sent by a Streams client in the Streams group heartbeat during initialization to the group coordinator.

This topology representation is used together with the partition metadata on the broker to create a configured topology.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
2025-01-09 13:16:03 +01:00
Bruno Cadonna 624dd45809
KAFKA-18321: Add StreamsGroupMember, MemberState and Assignment classes (#18276)
* KAFKA-18321: Add StreamsGroupMember, MemberState and Assignment classes

This commit adds the classes to represent a Streams group member in the
consumer coordinator.

Reviewers: Bill Bejeck <bill@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
2025-01-08 17:26:41 +01:00
Lucas Brutschy 2521aee7d4
KAFKA-18311: Configuring changelog topics (2/N) (#18379)
A simplified port of "ChangelogTopics" from the client-side to the group coordinator

Compared to the client-side version, the implementation uses immutable data structures, and returns the computed number of partitions instead of modifying mutable data structures and calling the admin client.

Reviewers: Bruno Cadonna <cadonna@apache.org>
2025-01-06 17:17:28 +01:00
Lucas Brutschy 35489bfca3
KAFKA-18311: Add internal datastructure for configuring topologies (1/N) (#18268)
Clients in the Streams Rebalance Protocol send an "unconfigured" representation of the topology to the broker. That is, the number of input topics and (some) internal topics is not fixed, regular expressions are not resolved. The broker takes this description of the topology and, together with the current state of the topics on the broker, derives a ConfiguredTopology. The configured topology is what is being returned from StreamsGroupDescribe, and has all number of partitions defined, and regular expressions resolved. The configured topology also contains missing internal topics that need to be created, and potentially configuration errors, such as missing source topics.

In this change, we add the internal data structures for representing the configured topology. They differ in some details from the data structures used in the RPCs. Most importantly, they can be evolved independently of the public interface.

Reviewers: Bruno Cadonna <cadonna@apache.org>
2025-01-06 13:51:00 +01:00
Ismael Juma d6f24d3665
Use `instanceof` pattern to avoid explicit cast (#18373)
This feature was introduced in Java 16.

Reviewers: David Arthur <mumrah@gmail.com>, Apoorv Mittal <apoorvmittal10@gmail.com>
2025-01-02 09:32:51 -08:00
Lucas Brutschy dd0fd55316
KAFKA-18319: Add task assignor interfaces (#18270)
Introduces interfaces for defining task assignors. Task assignors are pure functions, mapping the state of the group and a topology to a target assignment. We include a mock assignor, which we will be able to use when testing / benchmarking without the complexities of the sticky task assignor and the high-availability task assignor. We may remove the mock assignor in before the streams rebalance protocol goes GA.

The consumer groups introduce these interfaces to establish a clear separation between the group coordinator code and the pluggable assignors, which may live outside the group coordinator code. We have removed pluggable assignors in KIP-1071, but I think it still makes sense to keep these interfaces for having a clean interface for people to code against. This will pay off, if we plan on making the task assignors pluggable later.

Reviewers: Bill Bejeck <bbejeck@gmail.com>, David Jacot <djacot@confluent.io>
2025-01-02 16:31:35 +01:00
Andrew Schofield 0344f8f5ae
KAFKA-18273: KIP-1099 verbose display share group options (#18259)
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
2025-01-02 09:12:52 +00:00
Ismael Juma fe56fc98fa
KAFKA-18269: Remove deprecated protocol APIs support (KIP-896, KIP-724) (#18218)
Included in this change:
1. Remove deprecated protocol api versions from json files.
3. Remove fields that are no longer used from json files (affects ListOffsets, OffsetCommit, DescribeConfigs).
4. Remove record down-conversion support from KafkaApis.
5. No longer return `Errors.UNSUPPORTED_COMPRESSION_TYPE` on the fetch path[1].
6. Deprecate `TopicConfig. MESSAGE_DOWNCONVERSION_ENABLE_CONFIG` and made the relevant
configs (`message.downconversion.enable` and `log.message.downcoversion.enable`) no-ops since
down-conversion is no longer supported. It was an oversight not to deprecate this via KIP-724.
7. Fix `shouldRetainsBufferReference` to handle null request schemas for a given version.
8. Simplify producer logic since it only supports the v2 record format now.
9. Fix tests so they don't exercise protocol api versions that have been removed.
10. Add upgrade note.

Testing:
1. System tests have a lot of failures, but those tests fail for trunk too and I didn't see any issues specific to this change - it's hard to be sure given the number of failing tests, but let's not block on that given the other testing that has been done (see below).
3. Java producers and consumers with version 0.9-0.10.1 don't have api versions support and hence they fail in an ungraceful manner: the broker disconnects and the clients reconnect until the relevant timeout is triggered.
4. Same thing seems to happen for the console producer 0.10.2 although it's unclear why since api versions should be supported. I will look into this separately, it's unlikely to be related to this PR.
5. Console consumer 0.10.2 fails with the expected error and a reasonable message[2].
6. Console producer and consumer 0.11.0 works fine, newer versions should naturally also work fine.
7. kcat 1.5.0 (based on librdkafka 1.1.0) produce and consume fail with a reasonable message[3][4].
8. kcat 1.6.0-1.7.0 (based on librdkafka 1.5.0 and 1.7.0 respectively) consume fails with a reasonable message[5].
9. kcat 1.6.0-1.7.0 produce works fine.
10. kcat 1.7.1  (based on librdkafka 1.8.2) works fine for consumer and produce.
11. confluent-go-client (librdkafka based) 1.8.2 works fine for consumer and produce.
12. I will test more clients, but I don't think we need to block the PR on that.

Note that this also completes part of KIP-724: produce v2 and lower as well as fetch v3 and lower are no longer supported.

Future PRs will remove conditional code that is no longer needed (some of that has been done in KafkaApis,
but only what was required due to the schema changes). We can probably do that in master only as it does
not change behavior.

Note that I did not touch `ignorable` fields even though some of them could have been
changed. The reasoning is that this could result in incompatible changes for clients
that use new protocol versions without setting such fields _if_ we don't manually
validate their presence. I will file a JIRA ticket to look into this carefully for each
case (i.e. if we do validate their presence for the appropriate versions, we can
set them to ignorable=false in the json file).

[1] We would return this error if a fetch < v10 was used and the compression topic config was set
to zstd, but we would not do the same for the case where zstd was compressed at the producer
level (the most common case). Since there is no efficient way to do the check for the common
case, I made it consistent for both by having no checks.
[2] ```org.apache.kafka.common.errors.UnsupportedVersionException: The broker is too new to support JOIN_GROUP version 1```
[3]```METADATA|rdkafka#producer-1| [thrd:main]: localhost:9092/bootstrap: Metadata request failed: connected: Local: Required feature not supported by broker (0ms): Permanent```
[4]```METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Metadata request failed: connected: Local: Required feature not supported by broker (0ms): Permanent```
[5] `ERROR: Topic test-topic [0] error: Failed to query logical offset END: Local: Required feature not supported by broker`

Reviewers: David Arthur <mumrah@gmail.com>
2024-12-20 19:52:00 -08:00
David Jacot d67379c310
KAFKA-18301; Make coordinator records first class citizen (#18261)
This patch is the first one in a series to improve how coordinator records are managed. It focuses on making coordinator records first class citizen in the generator.
* Introduce `coordinator-key` and `coordinator-value` in the schema;
* Introduce `apiKey` for those. This is done to avoid relying on the version to determine the type.
* It also allows the generator to enforce some rules: the key cannot use flexible versions, the key must have a single version `0`, there must be a key and a value for a given api key, etc.
* It generates an enum with all the coordinator record types. This is pretty handy in the code.

The patch also updates the group coordinators to use those.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Andrew Schofield <aschofield@confluent.io>
2024-12-20 12:16:14 +01:00
Jeff Kim cd08129f3d
MINOR: change the default linger time in the new coordinator (#18274) 2024-12-19 16:54:23 -05:00
Bruno Cadonna 4a6c6d1a1e
KAFKA-18284: Add group coordinator records for Streams rebalance protocol (#18228)
KIP-1071 specifies records that the group coordinator needs to store
into the consumer offset topic to persist the state of a Streams
group. This records are specified in json files from which the actual
classes for the records are generated.

This commit adds the records needed by the group coordinator to store
the state of a Streams group.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
2024-12-19 09:03:33 +01:00
David Jacot bc5b627ebb
MINOR: Mark KIP-848's public apis as stable (#18235)
KIP-848 will be release as GA in Apache Kafka 4.0. Hence we need to mark all the related public apis as stable.

Reviewers: Jeff Kim <jeff.kim@confluent.io>
2024-12-17 20:02:23 +01:00
Peter Lee 220c578521
KAFKA-18014: Add duration based offset reset option for ShareConsumer (#18096)
Kafka consumer supports auto.offset.reset config option, which is used when there is no initial offset in Kafka (or) if the current offset does not exist any more on the server. This config currently supports earliest/latest/none options. Currently consumer resets might force applications to reprocess large amounts of data from earlier offsets. With infinite storage, its beneficial to have a duration based offset reset strategy. This will allow applications to consume/initialise from a fixed duration when there is no initial offset in Kafka.

As part of KIP-932, we are adding support for share consumer groups. Share consumer groups supports dynamic group configuration property share.auto.offset.reset. This is used to set the initial Share-Partition Start Offset (SPSO) based on the share.auto.offset.reset configuration. Currently share.auto.offset.reset supports earliest and latest options to automatically reset the offset

Similar to the Kafka Consumer, we will add support for by_duration: config value for share.auto.offset.reset.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
2024-12-16 08:25:37 +00:00
Kuan-Po Tseng fef625cfa2
KAFKA-18234 DumpLogSegments cannot print ConsumerGroupRegularExpression record (#18173)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-12-15 06:02:02 +08:00
David Jacot 450c10d00c
KAFKA-17507; WriteTxnMarkers API must not return until markers are written and materialized in group coordinator's cache (#18168)
We have observed the below errors in some cluster:

Uncaught exception in scheduled task 'handleTxnCompletion-902667' exception.message:Trying to complete a transactional offset commit for producerId *** and groupId *** even though the offset commit record itself hasn't been appended to the log.

When a transaction is completed, the transaction coordinator sends a WriteTxnMarkers request to all the partitions involved in the transaction to write the markers to them. When the broker receives it, it writes the markers and if markers are written to the __consumer_offsets partitions, it informs the group coordinator that it can materialize the pending transactional offsets in its main cache. The group coordinator does this asynchronously since Apache Kafka 2.0, see this patch.

The above error appends when the asynchronous operation is executed by the scheduler and the operation finds that there are pending transactional offsets that were not written yet. How come?

There is actually an issue is the steps described above. The group coordinator does not wait until the asynchronous operation completes to return to the api layer. Hence the WriteTxnMarkers response may be send back to the transaction coordinator before the async operation is actually completed. Hence it is possible that the next transactional produce to be started also before the operation is completed too. This could explain why the group coordinator has pending transactional offsets that are not written yet.

There is a similar issue when the transaction is aborted. However on this path, we don't have any checks to verify whether all the pending transactional offsets have been written or not so we don't see any errors in our logs. Due to the same race condition, it is possible to actually remove the wrong pending transactional offsets.

PS: The new group coordinator is not impacted by this bug.

Reviewers: Justine Olshan <jolshan@confluent.io>
2024-12-13 13:50:41 -08:00
TengYao Chi b37b89c668
KAFKA-9366 Upgrade log4j to log4j2 (#17373)
This pull request replaces Log4j with Log4j2 across the entire project, including dependencies, configurations, and code. The notable changes are listed below:

1. Introduce Log4j2 Instead of Log4j
2. Change Configuration File Format from Properties to YAML
3. Adds warnings to notify users if they are still using Log4j properties, encouraging them to transition to Log4j2 configurations

Co-authored-by: Lee Dongjin <dongjin@apache.org>

Reviewers: Luke Chen <showuon@gmail.com>, Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-12-14 01:14:31 +08:00
David Jacot 57737a357f
KAFKA-18188; Admin LeaveGroup should allow removing member using consumer protocol by member id (#18116)
The LeaveGroup API is used by the admin client to remove static members or remove all members from the group. The latter does not work because the API does not allow removing a member using the CONSUMER protocol by member id. Moreover, the response should only include the member id if the member id was included in the request. This patch fixes both issues.

Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Christo Lolov <lolovc@amazon.com>, Jeff Kim <jeff.kim@confluent.io>
2024-12-10 23:17:32 -08:00
Sean Quah 9ae1b0f017
KAFKA-18134; Disallow group upgrades when custom assignors are used (#18046)
Disallow upgrades from classic groups to consumer groups when any member's assignment has non-empty userData.

Reviewers: David Jacot <djacot@confluent.io>
2024-12-09 00:39:22 -08:00
yx9o 38e727fe4d
KAFKA-17864: add descriptions to fields in the agreement (#17681)
Improve descriptive information in Kafka protocol documentation.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
2024-12-07 18:47:11 +00:00
David Jacot 24385a89cf
MINOR: Replace assertUnorderedListEquals by assertUnorderedRecordsEquals in group-coordinator module (#18076)
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-12-07 11:43:25 +08:00
Calvin Liu 755adf8a56
KAFKA-14563: RemoveClient-Side AddPartitionsToTxn Requests (#17698)
Removes the client side AddPartitionsToTxn/AddOffsetsToTxn calls so that the partition is implicitly added as part of KIP-890 part 2. 

This change also requires updating the valid state transitions. The client side can not know for certain if a partition has been added server side when the request times out (partial completion). Thus for TV2, the transition to PrepareAbort is now valid for Empty, CompleteCommit, and CompleteAbort. 

For readability, the V1 and V2 endTransaction methods have been separated. 

Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan <jolshan@confluent.io>, Ritika Reddy <rreddy@confluent.io>
2024-12-06 09:00:04 -08:00
David Jacot b7294d92e1
KAFKA-17593; [11/11] Update subscription type (#18020)
This is the last patch in the series which introduces regular expressions in the new consumer group protocol. The patch ensures that the subscription type of the group takes into account the regular expressions. Please refer to the code to see how they are included.

Reviewers: Sean Quah <squah@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
2024-12-06 06:57:27 -08:00
Andrew Schofield e7d986e48c
KAFKA-17550: DescribeGroups v6 exploitation (#17706)
This PR introduces the DescribeGroups v6 API as part of KIP-1043. This adds an error message for the described groups so that it is possible to get some context on the error. It also changes the behaviour for when the group ID cannot be found but returning error code GROUP_ID_NOT_FOUND rather than NONE.

Reviewers: David Jacot <djacot@confluent.io>
2024-12-05 23:12:24 -08:00
David Jacot 8864cba0e8
MINOR: Update full request condition in ConsumerGroupHeartbeat request handling (#18061)
With the addition of the SubscribedTopicRegex field to the ConsumerGroupHeartbeat request, we need to update the definition of a full request. This patch does so.

Reviewers: Lianet Magrans <lmagrans@confluent.io>
2024-12-05 23:05:22 -08:00
David Jacot e99561e1f3
KAFKA-17593; [10/N] Remove resolved regular expressions when unsubscribed (#17976)
This patch does a few things:
1) It cleans up resolved regular expressions when they are unsubscribed from. It covers the regular leave/fenced paths for the new protocol and it also covers the LeaveGroup API as new members could be removed via the admin API.
2) It ensures that tombstones for  resolved regular expressions are generated on the conversion patch from consumer to classic group.
3) It fixes [KAFKA-18116](https://issues.apache.org/jira/browse/KAFKA-18116) because I faced the same issue while working on the LeaveGroup API. It adds an integration test for this case too.

Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
2024-12-04 23:41:37 -08:00
PoAn Yang fe88232b07
KAFKA-17750 Extend kafka-consumer-groups command line tool to support new consumer group (part 1) (#17958)
1) Bump validVersions of ConsumerGroupDescribeRequest.json and ConsumerGroupDescribeResponse.json to "0-1".

2) Add MemberType field to ConsumerGroupDescribeResponse.json. Default value is -1 (unknown). 0 is for classic member and 1 is for consumer member.

3) When ConsumerGroupMember#useClassicProtocol is true, return MemberType field as 0. Otherwise, return 1.

Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-12-04 06:08:39 +08:00
Kuan-Po Tseng ac8b3dfbf0
KAFKA-18060 new coordinator does not handle TxnOffsetCommitRequest with empty member id when using CONSUMER group (#17914)
There are two issues in KAFKA-18060:

1) New coordinator can't handle the TxnOffsetCommitRequest with empty member id, and TxnOffsetCommitRequest v0-v2 do definitely has an empty member ID, causing ConsumerGroup#validateOffsetCommit to throw an UnknownMemberIdException. This prevents the old producer from calling sendOffsetsToTransaction. Note that TxnOffsetCommitRequest versions v0-v2 are included in KIP-896, so it seems the new coordinator should support that operations

2) The deprecated API Producer#sendOffsetsToTransaction does not use v0-v2 to send TxnOffsetCommitRequest with an empty member ID. Unfortunately, it has been released for a while. Therefore, the new coordinator needs to handle TxnOffsetCommitRequest with an empty member ID for all versions.

Taken from the two issues above, we need to handle empty member id in all API versions when new coordinator are dealing with TxnOffsetCommitRequest.

Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-12-04 02:55:19 +08:00
David Jacot 275b995bf2
KAFKA-18095; Allow a member to join without subscription under new consumer protocol (#18003)
This patch relaxes requiring non-empty subscribed names and regex in the full heartbeat request. Without this, a consumer using client side regexes may not be able to join the group when the regex does not match any topics yet and this is inconsistent with the old protocol. Relaxing the subscribed regex is not strictly required but it seems better to keep it consistent.

Reviewers: Lianet Magrans <lmagrans@confluent.io>
2024-12-03 02:11:36 -08:00
David Jacot 44cb90222c
MINOR: Refactor configs in GroupMetadataManager (#17982)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-12-02 02:26:28 +08:00
David Jacot 24dd11d693
KAFKA-17593; [8/N] Resolve regular expressions (#17864)
This patch introduces the asynchronous resolution of regular expressions. Let me unpack a few details about the implementations:
1) I have decided to finally update all the regular expressions within a consumer group together. My assumption is that the number of regular expressions in a group will be generally small but the number of topics in a cluster is large. Hence grouping has two benefits. Firstly, it allows to go through the list of topics once for all the regular expressions. Secondly, it reduces the number of potential rebalances because all the regular expressions are updated at the same time.
2) An update is triggered when the group is subscribed to at least one regular expressions.
3) An update is triggered when there is no ongoing update.
4) An update is triggered only of the previous one is older than 10s.
5) An update is triggered when the group has unresolved regular expressions.
6) An update is triggered when the metadata image has new topics.

Reviewers: Jeff Kim <jeff.kim@confluent.io>
2024-11-26 08:56:25 -08:00
Dongnuo Lyu 8ccb26de2e
KAFKA-17733: Protocol upgrade should allow empty member assignment in group conversion (#17853)
During conversion from classic to consumer group, if a member has empty assignment (e.g., the member just joined and has never synced), the conversion will fail because of the buffer underflow error when deserializing the member assignment. This patch allows empty assignment while deserializing the member assignment.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
2024-11-19 10:46:07 -08:00
Ken Huang a4cd94e4ef
MINOR: Fix the leak "unknown" `group.coordinator.rebalance.protocols` on documentation (#17834)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
2024-11-19 07:52:31 -08:00
Andrew Schofield 32c887b05e
KAFKA-17949: Introduce GroupState and replace ShareGroupState (#17763)
This PR introduces the unified GroupState enum for all group types from KIP-1043. This PR also removes ShareGroupState and begins the work to replace Admin.listShareGroups with Admin.listGroups. That will complete in a future PR.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
2024-11-19 21:17:12 +05:30
David Jacot a211ee99b5
KAFKA-17593; [7/N] Introduce CoordinatorExecutor (#17823)
This patch introduces the `CoordinatorExecutor` construct into the `CoordinatorRuntime`. It allows scheduling asynchronous tasks from within a `CoordinatorShard` while respecting the runtime semantic. It will be used to asynchronously resolve regular expressions.

The `GroupCoordinatorService` uses a default `ExecutorService` with a single thread to back it at the moment. It seems that it should be sufficient. In the future, we could consider making the number of threads configurable.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
2024-11-19 07:19:22 -08:00
Mickael Maison 389f96aabd
MINOR: Various cleanups in coordinator modules (#17828)
Reviewers: David Jacot <djacot@confluent.io>, Ken Huang <s7133700@gmail.com>
2024-11-19 10:01:05 +01:00
David Jacot 0685b73010
MINOR: Make `group.consumer.migration.policy` public (#17846)
This patch makes `group.consumer.migration.policy` as public config.

Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
2024-11-18 22:46:36 -08:00
David Jacot 6fc6e87382
KAFKA-17593; [6/N] Add new record to GroupCoordinatorRecordSerde (#17791)
This patch extends `GroupCoordinatorRecordSerde` to support the `ConsumerGroupRegularExpression` record.

Reviewers: Jeff Kim <jeff.kim@confluent.io>
2024-11-13 09:08:09 -08:00
David Jacot a802865aad
KAFKA-17593; [5/N] Include resolved regular expressions into target assignment computation (#17750)
This patch does a few things:
* Refactors the `TargetAssignmentBuilder` to use inheritance to differentiate Consumer and Share groups.
* Introduces `UnionSet` to lazily aggregate the subscriptions for a given member. 
* Wires the resolved regular expressions in the `GroupMetadataManager`. At the moment, they are only used when the target assignment is computed.

Reviewers: Sean Quah <squah@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
2024-11-13 06:59:52 -08:00
Colin Patrick McCabe 085b27ec6e
KAFKA-17987 Remove assorted ZK-related files (#17768)
Remove zookeeper files in bin:
- bin/zookeeper-security-migration.sh
- bin/zookeeper-server-start.sh
- bin/zookeeper-server-stop.sh
- bin/zookeeper-shell.sh

Remove files used to configure Kafka in zookeeper mode in config:
- config/server.properties
- config/zookeeper.properties

Remove ZK references from all remaining Kafka configuration files.

Remove ZK references from all log4j.properties files.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-11-13 20:32:18 +08:00
David Jacot 5a5239770f
MINOR: Refactor GroupCoordinator's Assertions (#17755)
This patch cleans up the `Assertions` class in the `group-coordinator` module.

Reviewers: Lianet Magrans <lmagrans@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-11-12 05:30:58 -08:00
David Jacot 8be958d661
MINOR: Cleanup GroupMetadataManagerTest (#17751)
This patch does a few cleanups in GroupMetadataManagerTest:
* Uses `Map.of` where possible.
* Uses `List.of` instead of `Arrays.asList`.
* Fix inconsistent indentation in a few places.

Reviewers: Lianet Magrans <lmagrans@confluent.io>
2024-11-11 22:53:07 -08:00
Chirag Wadhwa 9db5ed00a8
KAFKA-16726: Added share.auto.offset.reset dynamic config for share groups (#17573)
This PR adds another dynamic config share.auto.offset.reset fir share groups.


Reviewers:  Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>,  Abhinav Dixit <adixit@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
2024-11-11 14:36:11 +05:30
David Jacot f7d2a8cd52
MINOR: Cleanup GroupCoordinatorRecordHelpers (#17718)
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-11-09 00:00:03 +08:00
David Jacot 1792b19a05
KAFKA-17593; [4/N] Introduce ConsumerGroupRegularExpression record & related bookkeeping in ConsumerGroup (#17694)
This patch introduces the ConsumerGroupRegularExpression record (key + value) and updates the `GroupMatadataManager` and the `ConsumerGroup` to bookkeep it appropriately. Note that with this change, regular expressions are counted as subscribers in the `subscribedTopicNames` data structure. This is important because the topic metadata of the group is computed based on it.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
2024-11-07 23:16:51 -08:00
David Jacot 5cf91e4cbe
KAFKA-17593; [3/N] Track the number of subscribed members per regular expression in ConsumerGroup (#17653)
This patch adds a data structure to ConsumerGroup to track the number of members subscribed to each regular expressions in the group. This will be useful to know whether a regex is new in the group or whether a regex must be removed from the group.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
2024-11-04 06:39:09 -08:00
David Jacot 64f3ee4c33
KAFKA-17593; [2/N] Update request validation & validate regex (#17651)
This patch does two things:
1) Change the validation of the ConsumerGroupHeartbeat request to accept subscribed topic names and/or subscribed topic regex. At least of them must be set in the first request with epoch 0.
2) Validate the provided regular expression by compiling it.

Co-authored-by: Lianet Magrans <lmagrans@confluent.io>

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
2024-11-04 06:38:09 -08:00
Andrew Schofield 0707c1fde2
KAFKA-17908 Tweak log messages in group metadata manager (#17652)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-11-04 06:16:56 +08:00
Linsiyuan9 af53758746
KAFKA-17814 Use `final` declaration to replace the suppression `this-escape` (#17613)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-11-03 15:00:02 +08:00
TengYao Chi 6f040cabc7
KAFKA-17116 New consumer may not send effective leave group if member ID received after close (#17549)
KIP-1082 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-1082%3A+Require+Client-Generated+IDs+over+the+ConsumerGroupHeartbeat+RPC)

Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>, Lianet Magrans <lianetmr@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-11-01 03:03:17 +08:00
Ken Huang 2a46282b2a
KAFKA-17873: Add description to all packages in the public API (#17605)
Reviewers: Mickael Maison <mickael.maison@gmail.com>
2024-10-30 15:41:10 +01:00
Sushant Mahajan 5f92f60bff
KAFKA-17329: DefaultStatePersister implementation (#17270)
Adds the DefaultStatePersister and other supporting classes for managing share state.

* Added DefaultStatePersister implementation. This is the entry point for callers who wish to invoke the share state RPC API.
* Added PersisterStateManager which is used by DefaultStatePersister to manage and send the RPCs over the network.
* Added code to BrokerServer and BrokerMetadataPublisher to instantiate the appropriate persister based on the config value for group.share.persister.class.name. If this is not specified, the DefaultStatePersister will be used. To force use of NoOpStatePersister, set the config to empty. This is an internal config, not to be exposed to the end user. This will be used to factory plug the appropriate persister.
* Using this persister, the internal __share_group_state topic will come to life and will be used for persistence of share group info.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>
2024-10-28 14:11:04 -04:00
Apoorv Mittal 0d44415bac
KAFKA-17774: Adding capability to handle max fetch records in Share Fetch (KIP-932) (#17322)
The PR adds capability to restrict the messages in Share Fetch. The max fetch records will be an additional way to limit the number of records sent from broker to client.

In Share Fetch, with min and mx bytes, there exists 3 problems:

1. The max.poll.records client config sends the max number of records defined to application but might have fetched extra becuase of higher max bytes. But the timeout for the sent records has started on the broker.
2. As the application processes records as per max.poll.records, hence those number of records are sent in every acknowledgement. This causes the cache data to be tracked per offset as the batch is broken.
3. The client has to sent the partial acknoledgment batch and cannot piggyback on fetch requests.

To handle the above scenario max fetch records has been added. Once this PR is merged and we define the right methodolgy then KIP will be updated to have max fetch records in share fetch RPC rather as broker config.

Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
2024-10-23 13:21:32 -07:00
Sean Quah 6e8df2951d
MINOR: Log at info level when pending member joins a group (#17495)
When a dynamic member joins a group, it sends two JOIN_GROUP requests.
The first request doesn't have a member id and returns
MEMBER_ID_REQUIRED with a broker-generated member id. The second request
includes the broker-generated member id and joins the group for real.

We emit a log message at info level for the first join request, but not
the second. Log the second join request at info level too.

Reviewers: David Jacot <djacot@confluent.io>
2024-10-21 06:22:03 -07:00
Mickael Maison b54f0c129f
KAFKA-17476 Delete kafka.common.OffsetAndMetadata (#17553)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-10-20 02:15:24 +08:00
Dmitry Werner eb897c6ad5
MINOR: Fix potential NPE (#17541)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-10-19 01:05:45 +08:00
Sean Quah 017da21099
KAFKA-17710; Rework uniform heterogeneous assignor to improve perf (#17385)
Rework the uniform heterogeneous assignor to improve performance, while
preserving the high level ideas and structure from the existing
implementation:
 * The assignor works in 3 stages: importing the previous assignment for
   stickiness, assigning unassigned partitions and iteratively
   reassigning partitions to improve balance.
 * Unassigned partitions are assigned to the subscribers with the least
   number of partitions. This maximizes balance within a single topic.
 * During the iterative rebalancing phase, partitions are reassigned to
   their previous owner if it improves balance (stickiness restoration).
 * During the iterative rebalancing phase, partitions are reassigned to
   the subscriber with the least number of partitions to improve
   balance.

A non-exhaustive list of changes is:
 * The assignment of unassigned partitions and iterative reassignment
   stages now works through partitions topic by topic. Previously
   partitions from topics with the same number of partitions per
   subscriber would be interleaved. Since we iterate topic by topic, we
   can reuse data about topic subscribers.
 * Instead of maintaining TreeSets to find the least loaded subscribers,
   we sort an ArrayList of subscribers once per topic and start filling
   up subscribers, least loaded first. In testing, this approach was
   found to be faster than maintaining PriorityQueues.
 * Implement stickiness restoration by creating a mapping of partitions
   to previous owner and checking against that mapping, instead of
   tracking partition movements during iterative reassignment.
 * Track member partition counts using a plain int array, to avoid
   overhead from boxing and HashMap lookups. Member partition counts are
   accessed very frequently and this needs to be fast. As a consequence,
   we have to number members 0 to M - 1.
 * Bound the iterative reassignment stage to a fixed number of
   iterations. Under some uncommon subscription patterns, the iterative
   reassignment stage converges slowly. In these cases, the iterative
   reassignment stage terminates without producing an optimally balanced
   assignment anyway (see javadoc for balanceTopics).
 * Re-use Maps from the previous assignment where possible,
   ie. introduce a copy-on-write mechanism while computing the new
   assignment.

Reviewers: David Jacot <djacot@confluent.io>
2024-10-17 01:43:29 -07:00
Chirag Wadhwa d8b15ecd40
KAFKA-17756: Added dynamic share group configs share.heartbeat.interval.ms and share.session.timeout.ms (#17450)
This PR adds the 2 configs share.heartbeat.interval.ms and share.session.timeout.ms in GroupConfig. These can be dynamically set for a share group without restarting the server

Reviewers: Andrew Schofield <aschofield@confluent.io>,  Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
2024-10-15 13:36:12 +05:30
Andrew Schofield fef105e5ad
MINOR: Adjust maximum value of group.share.record.lock.duration.ms (#17472)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-10-12 08:57:02 +08:00
Gaurav Narula b03fe66cfe
KAFKA-17759 Remove Utils.mkSet (#17460)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-10-11 21:20:43 +08:00
Chirag Wadhwa f1aa3a555e
KAFKA-17532-2: Moved ShareGroupConfig and added share.record.lock.duration.ms to dynamic configs (#17331)
This PR is the first series in the attempt to add share.record.lock.duration.ms to dynamic group configs. As part of this PR, the ShareGroupConfig has been moved to org.apache.kafka.coordinator.group.modern.share

Reviewers:  Andrew Schofield <aschofield@confluent.io>,  Apoorv Mittal <apoorvmittal10@gmail.com>, Abhinav Dixit <adixit@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
2024-10-10 11:27:59 +05:30
Sean Quah bb6ebd83f9
MINOR: Fix typo and refactor new group coordinator tests (#17072)
This patch fixes a few things:
* Typos.
* Merge the tests for fetchOffsets and fetchAllOffsets together into parameterized tests since they share the same structure.
* Use Topic.GROUP_METADATA_TOPIC_NAME instead of __consumer_offsets in new group coordinator tests.

Reviewers: Ken Huang <s7133700@gmail.com>, David Jacot <djacot@confluent.io>
2024-10-09 07:37:23 -07:00
Dongnuo Lyu 366aa1014c
KAFKA-17317; Validate and maybe trigger downgrade after static member replacement (#17306)
This implementation doesn't change the existing downgrade path.

In `classicGroupJoinToConsumerGroup`, if the group should be downgraded, it will be converted to a classic group at the end of the method. The returned records will be the records from GroupJoin plus the records from conversion. No rebalance will be triggered in the newly converted group.

Reviewers: David Jacot <djacot@confluent.io>
2024-10-07 02:11:16 -07:00
PoAn Yang 18a584c90e
KAFKA-17618; group consumer heartbeat interval should be less than session timeout (#17281)
This patch ensures that the heartbeat interval is smaller than the session timeout.

Reviewers: David Jacot <djacot@confluent.io>
2024-10-07 01:53:26 -07:00
Dongnuo Lyu cbc02e006d
KAFKA-16106; Schedule timeout task to refresh classic group size metric (#17325)
In the existing implementation, If an operation modifying the classic group state fails, the group reverts but the group size counter does not. This creates an inconsistency between the group size metric and the actual group size.

Considering that It will be complicated to rely on the appendFuture to revert the metrics upon the operation failure, this PR introduces a new implementation. A timeout task will periodically refresh the metrics based on the current groups soft state. The refreshing interval is hardcoded to 60 seconds.

Reviewers: David Jacot <djacot@confluent.io>
2024-10-04 00:31:06 -07:00
Sean Quah 99e1d8fbb3
MINOR: Cache topic resolution in TopicIds set (#17285)
Looking up topics in a TopicsImage is relatively slow. Cache the results
in TopicIds to improve assignor performance. In benchmarks, we see a
noticeable improvement in performance in the heterogeneous case.

Before
```
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt    Score   Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10         HOMOGENEOUS          1000  avgt    5   36.400 ± 3.004  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10       HETEROGENEOUS          1000  avgt    5  158.340 ± 0.825  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS          1000  avgt    5    1.329 ± 0.041  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10       HETEROGENEOUS          1000  avgt    5  382.901 ± 6.203  ms/op
```

After
```
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt    Score   Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10         HOMOGENEOUS          1000  avgt    5   36.465 ± 1.954  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10       HETEROGENEOUS          1000  avgt    5  114.043 ± 1.424  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS          1000  avgt    5    1.454 ± 0.019  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10       HETEROGENEOUS          1000  avgt    5  342.840 ± 2.744  ms/op
```

---

Based heavily on https://github.com/apache/kafka/pull/16527.

Reviewers: David Arthur <mumrah@gmail.com>, David Jacot <djacot@confluent.io>
2024-10-03 00:40:25 -07:00
Dongnuo Lyu e1deeb4b91
MINOR: Set default group.consumer.migration.policy to BIDIRECTIONAL (#17295)
This is a small patch to change the default value of group.consumer.migration.policy to BIDIRECTIONAL.

Reviewers: David Jacot <djacot@confluent.io>
2024-09-26 23:13:11 -07:00
David Jacot f8acfa5257
KAFKA-17621; Reduce logging verbosity on ConsumerGroupHeartbeat path (#17288)
While running large scale performance tests, we noticed that the logging on the ConsumerGroupHeartbeat path took a significant amount of CPU. It is mainly due to the very large data structures that we print out. I made a pass on those logs and I switched some of them to debug.

Reviewers: Lianet Magrans <lianetmr@gmail.com>
2024-09-26 11:00:44 -07:00
David Jacot bd94a739ef
KAFKA-17571; Revert "MINOR: Log pending join members (#17219)" (#17274)
This reverts commit 74bebf6e3d.

Reviewers: David Arthur <mumrah@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-09-26 00:04:32 -07:00
Lianet Magrans ab0df20489
KAFKA-17592; Support for SubscribedTopicsRegex in ConsumerGroupHeartbeat RPC (#17257)
This patch includes:
- Bump ConsumerGroupHeartbeatRequest version to include subscribedTopicRegex field
- Introduce new error code for InvalidRegularExpression 
- Bump ConsumerGroupHeartbeatResponse version to support new regex error
- Wire the new field into the GroupMetadataManager when processing HB

Reviewers: David Jacot <djacot@confluent.io>
2024-09-25 00:52:05 -07:00
PoAn Yang bb97d63d41
KAFKA-17578: Remove partitionRacks from TopicMetadata (#17233)
The ModernGroup#subscribedTopicMetadata takes too much memory due to partitionRacks. This is not being used at the moment as the consumer protocol does not support rack aware assignments.

A heap dump from a group with 500 members, 2K subscribed topic partitions shows 654,400 bytes used for partitionRacks. The rest of the ConsumerGroup object holds 822,860 bytes.

Reviewers: David Jacot <djacot@confluent.io>
2024-09-25 00:48:48 -07:00
David Jacot 74bebf6e3d
MINOR: Log pending join members (#17219)
I am still chasing KAFKA-17493. I was able to narrow it down to an issue with the pending join members. This patch logs them in order to help me troubleshooting it further. I will revert this change when the issue is root caused.

Reviewers: David Arthur <mumrah@gmail.com>
2024-09-18 00:34:06 -07:00
David Jacot f7430cf84b
MINOR: Log reason holding the completion of the join phase in the classic protocol (#17197)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-09-16 01:33:58 +08:00
David Jacot 31f79055ce
KAFKA-17306; Soften the validation when replaying tombstones (#16898)
This patch fixes a few buts in the replay logic of the consumer group records:
* The first issue is that the logic assumed that the group or the member exists when tombstones are replayed. Obviously, this is incorrect after a restart. The group or the member may not me there anymore if the __consumer_offsets partitions only contains tombstones for the group or the member. The patch fixes this by considering tombstones as no-ops if the entity does not exist.
* The second issue is that the logic assumed that consumer group records are always in a specific order in the log so the logic was only accepting to create a consumer group when `ConsumerGroupMemberMetadata` record is replayed. This is obviously incorrect too. During the life time of a consumer group, the records may be in different order. The patch fixes this by allowing the creating of a consumer group by any record.
* The third issue is that it is possible to replay offset commit records for a specific consumer group before the consumer group is actually created while replying its records. By default the OffsetMetadataManager creates a simple classic group to hold those offset commits. When the consumer offset records are finally replayed, the logic will fail because a classic group already exists. The patch fixes this by converting a simple classic group when records for a consumer group are replayed.

All those combinations are hard to test with unit tests. This patch adds an integration tests which reproduces some of those interleaving of records. I used them to reproduce the issues describe above.

Reviewers: TengYao Chi <kitingiao@gmail.com>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-09-10 07:28:36 -07:00
Sushant Mahajan 821c10157d
KAFKA-17367: Introduce share coordinator [2/N] (#17011)
Introduces the share coordinator. This coordinator is built on the new coordinator runtime framework. It 
is responsible for persistence of share-group state in a new internal topic named "__share_group_state".
The responsibility for being a share coordinator is distributed across the brokers in a cluster. 

Reviewers: David Arthur <mumrah@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
2024-09-09 20:01:24 -04:00
David Jacot 9abb8d3b3c
MINOR: Set `group.coordinator.rebalance.protocols` to `classic,consumer` by default (#17057)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-09-05 13:50:20 +08:00
Andrew Schofield b0d0956b20
KAFKA-17425: Improve coexistence of consumer and share groups (#17039)
This PR ensures that using the various group RPCs work properly when issued against the wrong type of group, such as DescribeConsumerGroups for a share group, or ConsumerGroupHeartbeat for a share group. There are no changes to the RPC error codes required.

The significant code changes are:

Making sure that the group coordinator does not assume that only classic and consumer groups exist. This was the cause of a ClassCastException when ConsumerGroupHeartbeat was being used against a share group.
Making sure that committing offsets to a share group fails with GroupIdNotFoundException rather than java.lang.UnsupportedOperation. This was the cause of a name collision between a share group and a consumer group when using kafka-consumer-groups.sh --reset-offsets which inadvertently created a consumer group of the same name.

 Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
2024-09-04 00:16:15 +05:30
Sean Quah b8ea409132
MINOR: Log when a consumer group is created by the admin client (#17073)
Reviewers: David Jacot <djacot@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-09-04 02:37:54 +08:00
DL1231 006af8b939
KAFKA-17327; Add support of group in kafka-configs.sh (#16887)
The patch adds support of alter/describe configs for group in kafka-configs.sh.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
2024-08-27 02:16:46 -07:00
Sushant Mahajan 1621f88f06
KAFKA-17367: Share coordinator infra classes [1/N] (#16921)
Introduce ShareCoordinator interface and related classes.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, David Arthur <mumrah@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-08-26 10:09:47 -04:00
David Jacot aaf887d3d9
KAFKA-14048; [2/2] Use the new group coordinator by default in 4.0 (#16945)
This patch makes the new group coordinator, introduced as part of KIP-848, the default. This means that any KRaft cluster created from trunk defaults to using the new group coordinator. This includes all the integration tests which do not specify it. This patch also changes the default in system tests.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-08-26 01:14:26 -07:00
Jeff Kim ced34e3176
KAFKA-16379; Coordinator event queue, processing, flush, purgatory time histograms (#16949)
This patch introduces a wrapper around [HdrHistogram](https://github.com/HdrHistogram/HdrHistogram) to use for group coordinator histograms, event queue time, event processing time, flush time, and purgatory time.

Reviewers: David Jacot <djacot@confluent.io>
2024-08-23 04:53:22 -07:00
David Jacot 944c1353a9
KAFKA-16736 Remove `offsets.commit.required.acks` in 4.0 (#16938)
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-08-21 11:14:04 +08:00
David Jacot de67ac6a9a
MINOR: Update `getOrMaybeCreateClassicGroup` to only throw GroupIdNotFoundException (#35) (#16919)
This patch updates getOrMaybeCreateClassicGroup to only throw GroupIdNotFoundException as we did for other internal methods. The callers are responsible for translating the error to the appropriate one depending on the context. There is only one case.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-08-20 04:36:42 -07:00
Sushant Mahajan c5e9154672
KAFKA-17342 Moved common coordinator code to separate module (#16883)
There is a lot of code in group-coordinator which is not share/consumer/classic group specific.

Since we are introducing a share-coordinator as part of KIP-932 (in a new module), it would make sense to get the common coordinator functionality into a separate common coordinator module so that share-coordinator need not depend on group-coordinator.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, David Jacot <djacot@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-08-18 21:48:44 +08:00
DL1231 3a0efa2845
KAFKA-14510; Extend DescribeConfigs API to support group configs (#16859)
This patch extends the DescribeConfigs API to support group configs.

Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
2024-08-14 06:37:57 -07:00
Andrew Schofield 8d29bc1fa8
KAFKA-17247 Revised share group record schemas (#16786)
In KIP-932, the group coordinator does not persist assignments for share groups. While this sounds like a good idea in terms of minimising overhead for data which doesn't strictly need to be recoverable, it significantly adds to the complexity of working with the coordinator framework.

This PR revises the definitions of the share group record schemas following more closely the schemas used for consumer groups, and eliminating the need to maintain soft state alongside the group coordinator's timeline structure.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-08-14 02:21:00 +08:00
Andrew Schofield deab703e43
KAFKA-17292: Introduce share coordinator protocol config (#16847)
Add the "share" group coordinator rebalance protocol as the way to enable KIP-932. It is also necessary to turn on the new group coordinator.

Reviewers:  Apoorv Mittal <apoorvmittal10@gmail.com>,  Manikumar Reddy <manikumar.reddy@gmail.com>
2024-08-13 08:58:27 +05:30
DL1231 bbdf79e1b4
KAFKA-14511; Extend AlterIncrementalConfigs API to support group config (#15067)
This patch add resources to store and handle consumer group's config.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>, David Jacot <djacot@confluent.io>
2024-08-12 00:40:13 -07:00
Apoorv Mittal 126b25b51d
KAFKA-17288 Removed tracking partition member epoch (KIP-932) (#16828)
Partition epochs are tracked for consumer groups where epoch is the current assigned member epoch. As share groups have partitions shared hence maintaing the partition epochs is not required.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-08-12 07:17:46 +08:00
Dongnuo Lyu ed0e1086a9
KAFKA-17228; Static member using new protocol should always replace the one using the old protocol (#16800)
This patch enables the static member using the old protocol to be replaced in ConsumerGroupHeartbeat even if it hasn't sent a request to leave the group.

This helps in cases where existing static member rejoins and triggers a group upgrade, because the classic static consumer doesn't send leave group request before shutting down.

Reviewers: TengYao Chi <kitingiao@gmail.com>, David Jacot <djacot@confluent.io>
2024-08-09 01:11:34 -07:00
Sean Quah 7a8edffad1
KAFKA-17267; Don't return REQUEST_TIMED_OUT for OFFSET_FETCHes (#16825)
When handling an OFFSET_FETCH request requiring stable offsets, the new
group coordinator may encounter a timeout under some circumstances, such
as a zombie coordinator or a lagging __consumer_offsets replica that has
not yet dropped out of the ISR. Existing and older clients do not expect
the REQUEST_TIMED_OUT error code won't retry, so remap it to
NOT_COORDINATOR to trigger a coordinator lookup and retry.

Reviewers: David Jacot <djacot@confluent.io>
2024-08-09 01:06:38 -07:00
Josep Prat 4e862c0903
KAFKA-15875: Stops leak Snapshot in public methods (#16807)
* KAFKA-15875: Stops leak Snapshot in public methods

The Snapshot class is package protected but it's returned in
several public methods in SnapshotRegistry.
To prevent this accidental leakage, these methods are made
package protected as well. For getOrCreateSnapshot a new
method called IdempotentCreateSnapshot is created that returns void.
* Make builer package protected, replace <br> with <p>

Reviewers: Greg Harris <greg.harris@aiven.io>
2024-08-08 20:05:47 +02:00