Commit Graph

418 Commits

Author SHA1 Message Date
David Jacot 202e216a60
MINOR: Cleanup hasMemberSubscriptionChanged (#20047)
CI / build (push) Waiting to run Details
Cleanup hasMemberSubscriptionChanged.
Remove unused InvalidRegularExpressionException from the signature.

Reviewers: Sean Quah <squah@confluent.io>, Chia-Ping Tsai
 <chia7712@gmail.com>, TengYao Chi <frankvicky@apache.org>
2025-06-27 14:16:49 +08:00
David Jacot f6a78c4c2b
KAFKA-19246; OffsetFetch API does not return group level errors correctly with version 1 (#19704)
The OffsetFetch API does not support top level errors in version 1.
Hence, the top level error must be returned at the partition level.

Side note: It is a tad annoying that we create error response in
multiple places (e.g. KafkaApis, Group CoordinatorService). There were a
reason for this but I cannot remember.

Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Sean Quah <squah@confluent.io>, Ken Huang <s7133700@gmail.com>, TengYao Chi <frankvicky@apache.org>
2025-06-26 06:29:43 -07:00
Lucas Brutschy 23ddb1d8ac
MINOR: Reject requests using unsupported features in KIP-1071 (#20031)
CI / build (push) Waiting to run Details
KIP-1071 does not currently support all features planned in the KIP. We
should reject any requests that are using features that are currently
not implemented.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Matthias J. Sax
 <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
2025-06-25 14:48:56 +02:00
Rajini Sivaram 33a1648c44
MINOR: Fix response for consumer group describe with empty group id (#20030)
ConsumerGroupDescribe with an empty group id returns a response containing `null` groupId in a non-nullable field. Since the response cannot be serialized, this results in UNKNOWN_SERVER_ERROR being returned to the client. This PR sets the group id in the response to an empty string instead and adds request tests for empty group id.

Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2025-06-25 10:33:44 +01:00
Sushant Mahajan 815dd93e2f
MINOR: Invoke share group rebalance sensor. (#20006)
* The share group rebalance metric was not being invoked at the
appropriate group id bump position.
* This PR solves the issue.
* The metric name has been updated
(s/rebalance-rate/share-group-rebalance-rate,
s/rebalance-count/share-group-rebalance-count/)
* Updated tests in `GroupMetadataManagerTest` and
`GroupCoordinatorMetricsTest`

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-06-21 08:35:19 +01:00
Andrew Schofield 4690527fab
KAFKA-19362: Finalize homogeneous simple share assignor (#19977)
Finalise the share group SimpleAssignor for homogeneous subscriptions.
The assignor code is much more accurate about the number of partitions
assigned to each member, and the number of members assigned for each
partition. It eliminates the idea of hash-based assignment because that
has been shown to the unhelpful. The revised code is very much more
effective at assigning evenly as the number of members grows and shrinks
over time.

A future PR will address the code for heterogeneous subscriptions.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
2025-06-20 16:10:47 +01:00
Dongnuo Lyu af012e1ec2
KAFKA-18961: Time-based refresh for server-side RE2J regex (#19904)
Consumers can subscribe to an RE2J SubscriptionPattern that will be
resolved and maintained on the server-side (KIP-848). Currently, those
regexes are refreshed on the coordinator when a consumer subscribes to a
new regex, or if there is a new topic metadata image (to ensure regex
resolution stays up-to-date with existing topics)

But with
[KAFKA-18813](https://issues.apache.org/jira/browse/KAFKA-18813), the
topics matching a regex are filtered based on ACLs. This generates a new
situation, as regexes resolution do not stay up-to-date as topics become
visible (ACLs added/delete).

This patch introduces time-based refresh for the subscribed regex by
- Adding internal `group.consumer.regex.batch.refresh.max.interval.ms`
config
that controls the refresh interval.
- Schedule a regex refresh when updating regex subscription if the
latest refresh is older than the max interval.

Reviewers: David Jacot <djacot@confluent.io>
2025-06-12 04:54:39 -07:00
Jhen-Yung Hsu 2e968560e0
MINOR: Cleanup simplify set initialization with Set.of (#19925)
Simplify Set initialization and reduce the overhead of creating extra
collections.

The changes mostly include:
- new HashSet<>(List.of(...))
- new HashSet<>(Arrays.asList(...)) / new HashSet<>(asList(...))
- new HashSet<>(Collections.singletonList()) / new
HashSet<>(singletonList())
- new HashSet<>(Collections.emptyList())
- new HashSet<>(Set.of())

This change takes the following into account, and we will not change to
Set.of in these scenarios:
- Require `mutability` (UnsupportedOperationException).
- Allow `duplicate` elements (IllegalArgumentException).
- Allow `null` elements (NullPointerException).
- Depend on `Ordering`. `Set.of` does not guarantee order, so it could
make tests flaky or break public interfaces.

Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
 <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
2025-06-11 18:36:14 +08:00
PoAn Yang 844b0e651b
KAFKA-19369: Add group.share.assignors config and integration test (#19900)
CI / build (push) Waiting to run Details
* Add `group.share.assignors` config to `GroupCoordinatorConfig`.
* Send `rackId` in share group heartbeat request if it's not null.
* Add integration test `testShareConsumerWithRackAwareAssignor`.

Reviewers: Lan Ding <53332773+DL1231@users.noreply.github.com>, Andrew
 Schofield <aschofield@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
2025-06-06 14:20:56 +01:00
PoAn Yang e0adec5549
KAFKA-19290: Exploit mapKey optimisation in protocol requests and responses (wip) (#19815)
The mapKey optimisation can be used in some KIP-932 RPC schemas to
improve efficiency of some key-based accesses.

* AlterShareGroupOffsetsResponse
* ShareFetchRequest
* ShareFetchResponse
* ShareAcknowledgeRequest
* ShareAcknowledgeResponse

Reviewers: Andrew Schofield <aschofield@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
2025-06-06 14:19:08 +01:00
Lucas Brutschy 25bc5f2cfa
KAFKA-19372: StreamsGroup not subscribed to a topic when empty (#19901)
We should behave more like a consumer group and make sure to not be
subscribed to the input topics anymore when the last member leaves the
group. We don't do this right now because our topology is still
initialized even after the last member leaves the group.

This will allow:
* Offsets to expire and be cleaned up.
* Offsets to be deleted through admin API calls.

Reviewers: Bill Bejeck <bbejeck@apache.org>
2025-06-04 20:55:14 +02:00
Lucas Brutschy 678d456ad7
KAFKA-19044: Handle tasks that are not present in the current topology (#19722)
A heartbeat might be sent to the group coordinator, claiming to own
tasks that we do  not know about. We need some logic to handle those
requests. In KIP-1071, we propose  to return `INVALID_REQUEST` error
whenever this happens, effectively letting the  clients crash.

This behavior will, however, make topology updates impossible. Bruno
Cadonna proposed  to only check that owned tasks match our set of
expected tasks if the topology epochs  between the group and the client
match. The aim of this change is to implement a  check and a behavior
for the first version of the protocol, which is to always  return
`INVALID_REQUEST` if an unknown task is sent to the group coordinator.

We can relax this constraint once we allow topology updating with
topology epochs.

To efficiently check this whenever we receive a heartbeat containing
tasks, we precompute the number of tasks for each subtopology. This also
benefits the performance of the assignor.

Reviewers: Bill Bejeck <bbejeck@apache.org>
2025-06-04 20:22:52 +02:00
PoAn Yang 2977cb17d0
KAFKA-17747: [6/N] Replace subscription metadata with metadata hash in share group (#19796)
* Use metadata hash to replace subscription metadata.
* Remove `ShareGroupPartitionMetadataKey` and
`ShareGroupPartitionMetadataValue`.
* Use `subscriptionTopicNames` and `metadataImage` to replace
`subscriptionMetadata` in `subscribedTopicsChangeMap` function.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot
<djacot@confluent.io>, Andrew Schofield <aschofield@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
2025-06-03 16:30:39 +01:00
Andrew Schofield 016a4a6c4c
KAFKA-19353: Upgrade note and initial docs for KIP-932 (#19863)
CI / build (push) Waiting to run Details
This is the initial documentation for KIP-932 preview in AK 4.1. The aim
is to get very minimal docs in before the cutoff. Longer term, more
comprehensive documentation will be provided for AK 4.2.

The PR includes:
* Generation of group-level configuration documentation
* Add link to KafkaShareConsumer to API docs
* Add a summary of share group rational to design docs
* Add basic operations information for share groups to ops docs
* Add upgrade note describing arrival of KIP-932 preview in 4.1

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>

---------

Co-authored-by: Apoorv Mittal <apoorvmittal10@gmail.com>
2025-06-03 13:23:11 +01:00
PoAn Yang 425f028556
KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in stream group (#19802)
* Use metadata hash to replace subscription metadata.
* Remove `StreamsGroupPartitionMetadataKey` and
`StreamsGroupPartitionMetadataValue`.
* Check whether `configuredTopology` is empty. If it's, call
`InternalTopicManager.configureTopics` and set the result to the group.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
2025-06-03 13:21:34 +02:00
Apoorv Mittal a70a667e95
MINOR: Fixing logs and adding exception in response (#19859)
PR streamlines the logs when delete share group or offset is triggered.
Also fixes the response when group is not found while deleting share
group.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Sushant Mahajan
 <smahajan@confluent.io>
2025-06-02 15:10:51 +01:00
Sean Quah 8323168b57
MINOR: Minor updates to RangeSet (#19678)
Minor updates to RangeSet:   * Disallow ranges with negative size   *
Disallow ranges with more than Integer.MAX_VALUE elements   * Fix
equals() so that all empty RangeSets are equal, to follow the Set
interface definition better.   * Reimplement hashCode() to follow the
Set interface definition.

Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
2025-05-31 02:31:51 +08:00
Sushant Mahajan 383a9ff9df
KAFKA-19344: Replace desc.assignablePartitions with spec.isPartitionAssignable. (#19838)
CI / build (push) Waiting to run Details
- A new method `assignablePartitions` was added to the
`SubscribedTopicDescriber`in https://github.com/apache/kafka/pull/19026.
This method was required for computing assignments for share groups
(KIP-932).
- However, since the describer is a public interface and is used to
encapsulate methods which return all subscribed partitions (KIP-848),
`assignablePartitions` is deemed inconsistent with this interface.
- Hence, this PR extends the `GroupSpec` interface to add a method
`isPartitionAssignable` which will serve the same purpose. The
`assignablePartitions` has been removed from the describer.
- Tests have been updated for the assigners and spec and removed from
describer as required.

Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot
 <djacot@confluent.io>
2025-05-28 20:27:29 +01:00
PoAn Yang d6ee83a893
KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group (#19761)
* Add `topicHashCache` to `GroupMetadataManager`.
* Remove topic hash from cache if related topic image is updated.
* Ignore topic hash 0 when calculating group metadata hash.
* Add `metadataHash` to `ModernGroup`.
* Replace subscription metadata with metadata hash.
* If there is data in `ConsumerGroupPartitionMetadataValue`, set a flag
in group to add tombstone record in next group heartbeat.

Reviewers: David Jacot <djacot@confluent.io>
2025-05-28 05:56:46 -07:00
Alieh Saeedi a3d5ca07f8
MINOR: Change `Streams group` to `streams group` (#19813)
As of https://github.com/apache/kafka/pull/19758#discussion_r2097734386,
the capitalization across all messages are aligned.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Andrew Schofield
 <aschofield@confluent.io>
2025-05-28 09:56:29 +02:00
David Jacot 25031373da
KAFKA-19154; Offset Fetch API should return INVALID_OFFSET if requested topic id does not match persisted one (#19744)
CI / build (push) Waiting to run Details
This patch updates the OffsetFetch API to ensure that a committed offset
is returned iff the requested topic id matches the persisted one; the
invalid offset is returned otherwise.

Reviewers: Lianet Magrans <lmagrans@confluent.io>
2025-05-27 16:15:06 +02:00
Dongnuo Lyu fcb722dc88
KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group (#19790)
CI / build (push) Waiting to run Details
When a consumer protocol static member replaces an existing member in a
classic group, it's not necessary to recompute the assignment. However,
it happens anyway.

In

[ConsumerGroup.fromClassicGroup](0ff4dafb7d/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java (L1140)),
we don't set the group's subscriptionMetadata.  Later in the consumer
group heartbeat, we [call

updateSubscriptionMetadata](0ff4dafb7d/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java (L1748)),
which [notices that the group's subscriptionMetadata needs an

update](0ff4dafb7d/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java (L2757))
and bumps the epoch. Since the epoch is bumped, we [recompute the

assignment](0ff4dafb7d/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java (L1766)).

As a fix, this patch sets the subscriptionMetadata in
ConsumerGroup.fromClassicGroup.

Reviewers: Sean Quah <squah@confluent.io>, David Jacot <djacot@confluent.io>
2025-05-27 02:25:57 -07:00
Jhen-Yung Hsu 651f86b77e
MINOR: Remove unused mkMapOfPartitionRacks method (#19797)
The mkMapOfPartitionRacks in ServerSideAssignorBenchmark.java was
introduced in 8013657f5d,  and the one in
GroupCoordinatorRecordHelpersTest.java was introduced in
3709901c9e.

Both have not been used since bb97d63d41.

Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
2025-05-26 02:54:17 +08:00
Sushant Mahajan af4d048da6
MINOR: Bugfix in GroupMetadataManager.testShareGroupInitializeSuccess. (#19795)
Currently, we were asserting on records containing set using
`assertEquals` which can fail intermittently. To fix the assertion has
been replaced by `assertRecordEquals`.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai
 <chia7712@gmail.com>
2025-05-23 23:54:38 +08:00
Ming-Yen Chung e107e69a51
HOTFIX: Fix GroupMetadataManager#completeAlterShareGroupOffsets to use InitMapValue in addInitializingTopicsRecords (#19792)
https://github.com/apache/kafka/pull/19781/files#diff-00f0f81cf13e66781777d94f7d2e68a581663385c37e98792507f2294c91bb09L2746-R2745
changes the `addInitializingTopicsRecords` signature while
https://github.com/apache/kafka/pull/18929/files#r2104172356 didn't make
adjustment accordingly.

Fix GroupMetadataManager#completeAlterShareGroupOffsets to use
`InitMapValue` in `initializingTopics` so  that
`addInitializingTopicsRecords` can accept it.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Sushant Mahajan <sushant.mahajan88@gmail.com>, Kuan-Po Tseng <brandboat@gmail.com>, TengYao Chi <frankvicky@apache.org>, Shivsundar R <shr@confluent.io>, PoAn Yang <payang@apache.org>, Nick Guo <lansg0504@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>
2025-05-23 13:07:37 +01:00
jimmy b44bfca408
KAFKA-16717 [2/N]: Add AdminClient.alterShareGroupOffsets (#18929)
[KAFKA-16720](https://issues.apache.org/jira/browse/KAFKA-16720) aims to
finish the AlterShareGroupOffsets RPC.

Reviewers: Andrew Schofield <aschofield@confluent.io>

---------

Co-authored-by: jimmy <wangzhiwang@qq.com>
2025-05-23 09:05:48 +01:00
Sushant Mahajan c58de75712
KAFKA-19204: Add timestamp to share state metadata init maps [1/N] (#19781)
1.  Currently, the code allows for retrying any initializing topics in
subsequent heartbeats. This can result in duplicate calls to persister
if multiple share consumers join the same group concurrently.
Furthermore, only one of these will succeed as the others will have a
lower state epoch and will be fenced.
2. The existing change was made in
https://github.com/apache/kafka/pull/19603 to allow for retrying
initialization of initializing topics, in case the original caller was
not able to persist the information in the persister due to a dead
broker/timeout.
3. To prevent multiple calls as well as allow for retry we have
supplemented the timelinehashmap holding the
`ShareGroupStatePartitionMetadataInfo` to also hold the timestamp at
which this record gets replayed.
  a. Now when we get multiple consumers for the same group and topic,
only one of them is allowed to make the persister initialize request and
this information is added to the map when it is replayed. Thus solving
issue 1.
  b. To allow for retries, if an initializing topic is found with a
timestamp which is older than 2*offset_write_commit_ms, that topic will
be allowed to be retried. Here too only one consumer would be able to
retry thus resolving issue 2 as well.
4. Tests have been added wherever applicable and existing ones updated.
5. No record schema changes are involved.
6. The `ShareGroupStatePartitionMetadataInfo` and `InitMapValue` records
have been moved to the `ShareGroup` class for better encapsulation.
7. Some logs have been changed from error to info in
`ShareCoordinatorShard` and extra information is logged.

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-05-23 08:56:05 +01:00
PoAn Yang c493d89334
KAFKA-17747: [3/N] Get rid of TopicMetadata in SubscribedTopicDescriberImpl (#19611)
CI / build (push) Waiting to run Details
Replace `TopicMetadata` with `MetadataImage` in
`SubscribedTopicDescriberImpl` and  `TargetAssignmentBuilder`.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot
 <djacot@confluent.io>
2025-05-19 20:46:24 +08:00
Lucas Brutschy f26974b16d
KAFKA-19202: Enable KIP-1071 in streams_eos_test (#19700)
CI / build (push) Waiting to run Details
Enable next system test with KIP-1071.

Some of the validation inside the test did not make sense for KIP-1071.
This is because in KIP-1071, if a member leaves or joins the group, not
all members may enter a REBALANCING state. We use the wrapper introduced
in   [KAFKA-19271](https://issues.apache.org/jira/browse/KAFKA-19271)
to print a log line whenever the member epoch is bumped, which is  the
only way a member can "indirectly" observe that other members  are
rebalancing.

Reviewers: Bill Bejeck <bill@confluent.io>
2025-05-17 21:20:47 +02:00
Bill Bejeck f397cbc14c
KAFKA-19256: Only send IQ metadata on assignment changes (#19691)
CI / build (push) Waiting to run Details
This PR adds changes, so the IQ endpoint information is only sent to
streams group members when there has been a change in the assignments
requiring an update in the streams client host-partition ownership.

The existing IQ integration test passes with no modifications and
updated the `GroupMetadataManagerTest` to cover the new process.

Reviewers: Matthias Sax <mjsax@apache.org>, Lucas Brutschy
 <lucasbru@apache.org>
2025-05-16 16:54:12 -04:00
David Jacot 199772adc5
KAFKA-19141; Persist topic id in OffsetCommit record (#19683)
This patch adds the `TopicId` field to the `OffsetCommitValue` record as
a tagged field. It will be later used on the offset fetch path to ensure
that the persisted offset matches the requested one.

Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Sean Quah
 <squah@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
2025-05-16 10:26:36 -04:00
PoAn Yang c26b09c609
KAFKA-18904: [1/N] Change ListClientMetricsResources API to ListConfigResources (#19493)
* Change `ListClientMetricsResourcesRequest.json` to
`ListConfigResourcesRequest.json`.
* Change `ListClientMetricsResourcesResponse.json` to
`ListConfigResourcesResponse.json`.
* Change `ListClientMetricsResourcesRequest.java` to
`ListConfigResourcesRequest.java`.
* Change `ListClientMetricsResourcesResponse.java` to
`ListConfigResourcesResponsejava`.
* Change `KafkaApis` to handle both `ListClientMetricsResourcesRequest`
v0 and v1 requests.

Reviewers: Andrew Schofield <aschofield@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
2025-05-15 23:39:00 +01:00
Sushant Mahajan a26d803f22
MINOR: Remove share group code from group coord onElection. (#19730)
* Previously we had added code to `GroupCoordinatorService.onElection`
to reconcile pending share group initializing topics. This was done to
manage state in case of failovers and broker failures.
* However, we later modified share group heartbeat code to do the retry
to clean up the state and the `onElection` code is now redundant.
* In this PR we are cleaning up this code.

Reviewers: David Jacot <djacot@confluent.io>, Andrew Schofield <aschofield@confluent.io>
2025-05-15 07:27:25 -07:00
PoAn Yang a1008dc85d
KAFKA-17747: [2/N] Add compute topic and group hash (#19523)
* Add `com.dynatrace.hash4j:hash4j:0.22.0` to dependencies.
* Add `computeTopicHash` to `org.apache.kafka.coordinator.group.Utils`.
  * If topic name is non-existent, return 0.
  * If topic name is existent, use streaming XXH3 to compute topic hash
with magic byte, topic id, topic name, number of partitions, partition
id and sorted racks.
* Add `computeGroupHash` to `org.apache.kafka.coordinator.group.Utils`.
  * If topic map is empty, return 0.
  * If topic map is not empty, use streaming XXH3 to compute group
metadata hash with sorted topic hashes by topic names.
* Add related unit test.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>, Sean Quah <squah@confluent.io>, David Jacot <djacot@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
2025-05-15 10:48:45 +02:00
Sean Quah c16c240bd1
KAFKA-18688: Fix uniform homogeneous assignor stability (#19677)
When the number of partitions is not divisible by the number of members,
some members will end up with one more partition than others.
Previously, we required these to be the members at the start of the
iteration order, which meant that partitions could be reassigned even
when the previous assignment was already balanced.

Allow any member to have the extra partition, so that we do not move
partitions around when the previous assignment is already balanced.

Before the PR
```
Benchmark                                 (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt    Score   Error  Units
ServerSideAssignorBenchmark.doAssignment              FULL           RANGE          false          10000                         50         HOMOGENEOUS          1000  avgt    2   26.175          ms/op
ServerSideAssignorBenchmark.doAssignment              FULL           RANGE          false          10000                         50       HETEROGENEOUS          1000  avgt    2  123.955          ms/op
ServerSideAssignorBenchmark.doAssignment       INCREMENTAL           RANGE          false          10000                         50         HOMOGENEOUS          1000  avgt    2   24.408          ms/op
ServerSideAssignorBenchmark.doAssignment       INCREMENTAL           RANGE          false          10000                         50       HETEROGENEOUS          1000  avgt    2  114.873          ms/op
```
After the PR
```
Benchmark                                 (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt    Score   Error  Units
ServerSideAssignorBenchmark.doAssignment              FULL           RANGE          false          10000                         50         HOMOGENEOUS          1000  avgt    2   24.259          ms/op
ServerSideAssignorBenchmark.doAssignment              FULL           RANGE          false          10000                         50       HETEROGENEOUS          1000  avgt    2  118.513          ms/op
ServerSideAssignorBenchmark.doAssignment       INCREMENTAL           RANGE          false          10000                         50         HOMOGENEOUS          1000  avgt    2   24.636          ms/op
ServerSideAssignorBenchmark.doAssignment       INCREMENTAL           RANGE          false          10000                         50       HETEROGENEOUS          1000  avgt    2  115.503          ms/op
```

Reviewers: David Jacot <djacot@confluent.io>
2025-05-13 08:01:14 -07:00
Sushant Mahajan bf53561d16
KAFKA-19201: Handle deletion of user topics part of share partitions. (#19559)
* Currently even if a user topic is deleted, its related records are not
deleted with respect to subscribed share groups from the GC and the SC.
* The event of topic delete is propagated from the
BrokerMetadataPublisher to the coordinators via the
`onPartitionsDeleted` method. This PR leverages this method to issue
cleanup calls to the GC and SC respectively.
* To prevent chaining of futures in the GC, we issue async calls to both
GC and SC independently and the methods take care of the respective
cleanups unaware of the other.
* This method is more efficient and transcends issues related to
timeouts/broker restarts resulting in chained future execution issues.

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-05-13 14:22:17 +01:00
David Jacot d154a314e7
KAFKA-14691; Add TopicId to OffsetFetch API (#19515)
This patch extends the OffsetFetch API to support topic ids. From
version 10 of the API, topic ids must be used.

The patch only contains the server side changes and it keeps the version
10 as unstable for now. We will mark the version as stable when the
client side changes are merged in.

Reviewers: TengYao Chi <frankvicky@apache.org>, Lianet Magrans <lmagrans@confluent.io>
2025-05-13 15:10:10 +02:00
Apoorv Mittal b4b73c604b
KAFKA-19245: Updated default locks config for share group (#19705)
Updated default locks config for share groups from 200 to 2000. The
increase in the limit is a result from tests which showed that with
default maxFetchRecords of 500 from client and 200 as locks limit, there
can't be parallel fetch for same partition. Also the tests resulted that
sharing a partition to an index of 3-4 is easily achievable, hence
raised the limit to 4 times of default limit of maxFetchRecords (500).

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-05-13 13:46:25 +01:00
Sean Quah 1293658cca
KAFKA-19163: Avoid deleting groups with pending transactional offsets (#19496)
When a group has pending transactional offsets but no committed offsets,
we can accidentally delete it while cleaning up expired offsets. Add a
check to avoid this case.

Reviewers: David Jacot <djacot@confluent.io>
2025-05-13 05:10:26 -07:00
Sean Quah eb3714f022
KAFKA-19160;KAFKA-19164; Improve performance of fetching stable offsets (#19497)
CI / build (push) Waiting to run Details
When fetching stable offsets in the group coordinator, we iterate over
all requested partitions. For each partition, we iterate over the
group's ongoing transactions to check if there is a pending
transactional offset commit for that partition.

This can get slow when there are a large number of partitions and a
large number of pending transactions. Instead, maintain a list of
pending transactions per partition to speed up lookups.

Reviewers: Shaan, Dongnuo Lyu <dlyu@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, David Jaco <djacot@confluent.io>
2025-05-12 00:32:17 -07:00
Andrew Schofield 7d027a4d83
KAFKA-19218: Add missing leader epoch to share group state summary response (#19602)
CI / build (push) Waiting to run Details
When the persister is responding to a read share-group state summary
request, it has no way of including the leader epoch in its response,
even though it has the information to hand. This means that the leader
epoch information is not initialised in the admin client operation to
list share group offsets, and this then means that the information
cannot be displayed in kafka-share-groups.sh.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Sushant Mahajan
 <smahajan@confluent.io>
2025-05-06 14:53:12 +01:00
Sushant Mahajan e68781414e
KAFKA-19204: Allow persister retry of initializing topics. (#19603)
CI / build (push) Waiting to run Details
* Currently in the share group heartbeat flow, if we see a TP subscribed
for the first time, we move that TP to initializing state in GC and let
the GC send a persister request to share group initialize the
aforementioned TP.
* However, if the coordinator runtime request for share group heartbeat
times out (maybe due to restarting/bad broker), the future completes
exceptionally resulting in persiter request to not be sent.
* Now, we are in a bad state since the TP is in initializing state in GC
but not persister initialized. Future heartbeats for the same share
partitions will also not help since we do not allow retrying persister
request for initializing TPs.
* This PR remedies the situation by allowing the same.
* A temporary fix to increase offset commit timeouts in system tests was
added to fix the issue. In this PR, we revert that change as well.

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-05-02 14:25:29 +01:00
Chirag Wadhwa b8de78cafd
KAFKA-19210: resolved the flakiness in testShareGroupHeartbeatInitializeOnPartitionUpdate (#19601)
The test testShareGroupHeartbeatInitializeOnPartitionUpdate was flaky
earlier. The shareGroupStatePartitionMetadataRecord that is created
during heartbeat contains 2 topics to be initialized, but the order in
which they appear in the list is not deterministic. The test is changed
to simply see whether the contents of the record is correct instead of
directly comparing it with an expected record which may contains the
correct topics, but in some different order.

Reviewers: Sushant Mahajan <smahajan@confluent.io>, Andrew Schofield <aschofield@confluent.io>
2025-04-30 16:07:55 +01:00
Andrew Schofield ce97b1d5e7
KAFKA-16894: Exploit share feature [3/N] (#19542)
This PR uses the v1 of the ShareVersion feature to enable share groups
for KIP-932.

Previously, there were two potential configs which could be used -
`group.share.enable=true` and including "share" in
`group.coordinator.rebalance.protocols`. After this PR, the first of
these is retained, but the second is not. Instead, the preferred switch
is the ShareVersion feature.

The `group.share.enable` config is temporarily retained for testing and
situations in which it is inconvenient to set the feature, but it should
really not be necessary, especially when we get to AK 4.2. The aim is to
remove this internal config at that point.

No tests should be setting `group.share.enable` any more, because they
can use the feature (which is enabled in test environments by default
because that's how features work). For tests which need to disable share
groups, they now set the share feature to v0. The majority of the code
changes were related to correct initialisation of the metadata cache in
tests now that a feature is used.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
2025-04-30 13:27:01 +01:00
Matthias J. Sax b0a26bc2f4
KAFKA-19173: Add `Feature` for "streams" group (#19509)
Add new StreamsGroupFeature, disabled by default,  and add "streams" as
default value to `group.coordinator.rebalance.protocols`.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot
<david.jacot@gmail.com>, Lucas Brutschy <lbrutschy@confluent.io>,
Justine Olshan <jolshan@confluent.io>, Andrew Schofield
<aschofield@confluent.io>, Jun Rao <jun@confluent.io>
2025-04-29 22:51:10 -07:00
Bill Bejeck 431cffc93f
KAFKA-19135 Migrate initial IQ support for KIP-1071 from feature branch to trunk (#19588)
This PR is a migration of the initial IQ support for KIP-1071 from the
feature branch to trunk.  It includes a parameterized integration test
that expects the same results whether using either the classic or new
streams group protocol.

Note that this PR will deliver IQ information in each heartbeat
response.  A follow-up PR will change that to be only sending IQ
information when assignments change.

Reviewers Lucas Brutschy <lucasbru@apache.org>
2025-04-29 20:08:49 -04:00
Chirag Wadhwa 2f9c2dd828
KAFKA-16718-3/n: Added the ShareGroupStatePartitionMetadata record during deletion of share group offsets (#19478)
This is a follow up PR for implementation of DeleteShareGroupOffsets
RPC. This PR adds the ShareGroupStatePartitionMetadata record to
__consumer__offsets topic to make sure the topic is removed from the
initializedTopics list. This PR also removes partitions from the request
and response schemas for DeleteShareGroupState RPC

Reviewers: Sushant Mahajan <smahajan@confluent.io>, Andrew Schofield <aschofield@confluent.io>
2025-04-25 22:01:48 +01:00
Matthias J. Sax 6462f7a0e2
MINOR: code cleanup GroupMetadataManagerTestContext (#19554)
Removes redundant calls, and updates code to Java 17.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2025-04-25 10:12:55 -07:00
David Jacot 5a9681651f
MINOR: Move request static validations to GroupCoordinatorService (#19556)
This patches moves the static request validations from the
`GroupMetadataManager` to the `GroupCoordinatorService`. We already had
static validation in the service for other requests so it makes sense to
consolidate all the static validations at the same place. Moreover, it
also prevents faulty requests from unnecessarily using group
coordinator's resources.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Andrew Schofield <aschofield@confluent.io>
2025-04-25 08:32:33 -07:00
Lucas Brutschy b3161ba1d1
HOTFIX: GroupMetadataManagerTest fails to compile (#19558)
Commit 369cc56 added a new parameter to newStreamsGroupEpochRecord, but
did not update the test that was added in 732ed06, breaking compilation.

Reviewers: David Jacot <djacot@confluent.io>
2025-04-25 11:16:46 +02:00