Commit Graph

442 Commits

Author SHA1 Message Date
Sean Quah 71a7d85955
KAFKA-19431: Ensure consumer and share assignment consistency with subscriptions (#20055)
Filter out unsubscribed topics during reconciliation.

This eliminates the window where a consumer group assignment could
contain unsubscribed topics when a member unsubscribes from a topic
while it has unrevoked partitions.

We also apply filtering in a few other cases that would arise when
client-side assignors are implemented, since new assignments would no
longer be available immediately. This is important for mixed groups,
since clients on the classic protocol will rejoin if they receive a
topic in their assignment that is no longer in their subscription.

Regex subscriptions have a window where the regex is not resolved and we
cannot know which topics are part of the subscription. We opt to be
conservative and treat unresolved regexes as matching no topics.

The same change is applied to share groups, since the reconciliation
process is similar.

To gauge the performance impact of the change, we add a jmh benchmark.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Lianet Magran  <lmagrans@confluent.io>, Sushant Mahajan <smahajan@confluent.io>, Dongnuo Lyu <dlyu@confluent.io>, David Jacot <djacot@confluent.io>
2025-10-06 14:57:44 +02:00
Lan Ding c2aeec46a2
MINOR: Remove logContext arrtibute from StreamsGroup and CoordinatorRuntime (#20572)
CI / build (push) Waiting to run Details
The `logContext` attribute in `StreamsGroup` and `CoordinatorRuntime` is
not used anymore.  This patch removes it.

Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
 <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2025-09-29 01:37:58 +08:00
Lucas Brutschy fb0518c34e
KAFKA-19730: StreamsGroupDescribe result is missing topology (#20574)
When toology not configured.

In the streams group heartbeat, we validate the topology set for the
group against the topic metadata, to generate the "configured topology"
which has a specific number of partitions for each topic.

In streams group describe, we use the configured topology to expose this
information to the user. However, we leave the topology information as
null in the streams group describe response, if the topology is not
configured. This triggers an IllegalStateException in the admin client
implementation.

Instead, we should expose the unconfigured topology when the configured
topology is not available, which will still expose useful information.

Reviewers: Matthias J. Sax <matthias@confluent.io>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-09-26 19:40:47 +02:00
Dongnuo Lyu cbea4f69bd
KAFKA-19546: Rebalance should be triggered by subscription change during group protocol downgrade (#20417)
During online downgrade, when a static member using the consumer
protocol which is also the last member using the consumer protocol is
replaced by another static member using the classic protocol with the
same instance id, the latter will take the assignment of the former and
an online downgrade will be triggered.

In the current implementation, if the replacing static member has a
different subscription, no rebalance will be triggered when the
downgrade happens. The patch checks whether the static member has
changed subscription and triggers a rebalance when it does.

Reviewers: Sean Quah <squah@confluent.io>, David Jacot
 <djacot@confluent.io>
2025-09-24 15:40:45 +02:00
Lan Ding cfa0b416ef
MINOR: Remove metrics attribute from StreamsGroup (#20559)
The `metrics` attribute in `StreamsGroup` is not used anymore. This
patch removes it.

Reviewers: Ken Huang <s7133700@gmail.com>, Lucas Brutschy
 <lbrutschy@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2025-09-19 16:32:41 +08:00
David Jacot 8c8e93c4a1
MINOR: Remove metrics attribute from ConsumerGroup (#20542)
The `metrics` attribute in `ConsumerGroup` is not used anymore. This
patch removes it.

Reviewers: Lianet Magrans <lmagrans@confluent.io>, Chia-Ping Tsai
 <chia7712@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Dongnuo Lyu
 <dlyu@confluent.io>
2025-09-18 11:10:35 +02:00
Lucas Brutschy 8628d74c49
KAFKA-19661 [6/N]: Use heaps also on the process-level (#20523)
In the current solution, we only use a heap to select the right process,
but resort to linear search for selecting a member within a process.
This means use cases where a lot of threads run within the same process
can yield slow assignment. The number of threads in a process shouldn’t
scale arbitrarily (our assumed case for benchmarking of 50 threads in a
single process seems quite extreme already), however, we can optimize
for this case to reduce the runtime further.

Other assignment algorithms assign directly on the member-level, but we
cannot do this in Kafka Streams, since we cannot assign tasks to
processes that already own the task. Defining a heap directly on members
would mean that we may have to skip through 10s of member before finding
one that does not belong to a process that does not yet own the member.

Instead, we can define a separate heap for each process, which keeps the
members of the process by load. We can only keep the heap as long as we
are only changing the load of the top-most member (which we usually do).
This means we keep track of a lot of heaps, but since heaps are backed
by arrays in Java, this should not result in extreme memory
inefficiencies.

In our worst-performing benchmark, this improves the runtime by ~2x on
top of the optimization above.

Also piggybacked are some minor optimizations / clean-ups:   -
initialize HashMaps and ArrayLists with the right capacity   - fix some
comments   - improve logging output

Note that this is a pure performance change, so there are no changes to
the unit tests.

Reviewers: Bill Bejeck<bbejeck@apache.org>
2025-09-15 17:19:53 +02:00
Lucas Brutschy 351203873d
KAFKA-19661 [5/N]: Use below-quota as a condition for standby task assignment (#20458)
In the original algorithm, standby tasks are assigned to a  process that
previously owned the task only if it is  “load-balanced”, meaning the
process has fewer tasks that  members, or it is the least loaded
process. This is strong  requirement, and will cause standby tasks to
often not get  assigned to process that previously owned it.
Furthermore,  the condition “is the least loaded process” is hard to
evaluate efficiently in this context.

We propose to instead use the same “below-quota” condition  as in active
task assignment.

We compute a quota for active and standby tasks, by definiing numOfTasks
= numberOfActiveTasks+numberOfStandbyTasks and  defining the quota as
numOfTasks/numberOfMembers rounded up.  Whenever a member becomes “full”
(its assigned number of tasks  is equal to numOfTasks) we deduct its
tasks from numOfTasks and  decrement numberOfMembers and recompute the
quota, which means  that the quota may be reduced by one during the
assignment  process, to avoid uneven assignments.

A standby task can be assigned to a process that previously  owned it,
whenever the process has fewer than  numOfMembersOfProcess*quota.

This condition will, again, prioritize standby stickyness,  and can be
evaluated in constant time.

In our worst-performing benchmark, this improves the runtime  by 2.5x on
top of the previous optimizations, but 5x on the  more important
incremental assignment case.

Reviewers: Bill Bejeck <bbejeck@apache.org>
2025-09-10 17:55:51 +02:00
Lucas Brutschy 620a01b74b
KAFKA-19661 [4/N]: Prefer range-style assignment (#20486)
This is actually fixing a difference between the old and the new
assignor. Given the assignment ordering, the legacy assignor has a
preference for range-style assignments built in, that is, assigning

C1: 0_0, 1_0  C2: 0_1, 1_1

instead of

C1: 0_0, 0_1  C2: 1_0, 1_1

We add tests to both assignors to check for this behavior, and improve
the new assingor by enforcing corresponding orderings.

Reviewers: Bill Bejeck <bill@confluent.io>
2025-09-09 10:44:37 +02:00
Ken Huang 0a12eaa80e
KAFKA-19112 Unifying LIST-Type Configuration Validation and Default Values (#20334)
We add the three main changes in this PR

- Disallowing null values for most LIST-type configurations makes sense,
since users cannot explicitly set a configuration to null in a
properties file. Therefore, only configurations with a default value of
null should be allowed to accept null.
- Disallowing duplicate values is reasonable, as there are currently no
known configurations in Kafka that require specifying the same value
multiple times. Allowing duplicates is both rare in practice and
potentially confusing to users.
- Disallowing empty list, even though many configurations currently
accept them. In practice, setting an empty list for several of these
configurations can lead to server startup failures or unexpected
behavior. Therefore, enforcing non-empty lists helps prevent
misconfiguration and improves system robustness.
These changes may introduce some backward incompatibility, but this
trade-off is justified by the significant improvements in safety,
consistency, and overall user experience.

Additionally, we introduce two minor adjustments:

- Reclassify some STRING-type configurations as LIST-type, particularly
those using comma-separated values to represent multiple entries. This
change reflects the actual semantics used in Kafka.
- Update the default values for some configurations to better align with
other configs.
These changes will not introduce any compatibility issues.

Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
2025-09-06 01:25:55 +08:00
Lucas Brutschy 6247fd9eb3
KAFKA-19478 [3/N]: Use heaps to discover the least loaded process (#20172)
The original implementation uses a linear search to find the least
loaded process in O(n), and we can replace this by look-ups in a heap is
O(log(n)), as described below

Active tasks: For active tasks, we can do exactly the same assignment as
in the original algorithm by first building a heap (by load) of all
processes. When we assign a task, we pick the head off the heap, assign
the task to it, update the load, and re-insert it into the heap in
O(log(n)).

Standby tasks: For standby tasks, we cannot do this optimization
directly, because of the order in which we assign tasks:

1. We first try to assign task A to a process that previously owned A.
2. If we did not find such a process, we assign A to the least loaded
node.
3. We now try to assign task B to a process that previously owned B
4. If we did not find such a process, we assign B to the least loaded
node
   ...

The problem is that we cannot efficiently keep a heap (by load)
throughout this process, because finding and removing process that
previously owned A (and B and…) in the heap is O(n). We therefore need
to change the order of evaluation to be able to use a heap:

1. Try to assign all tasks A, B.. to a process that previously owned the
task
2. Build a heap.
3. Assign all remaining tasks to the least-loaded process that does not
yet own the task. Since at most NumStandbyReplicas already own the task,
we can do it by removing up to NumStandbyReplicas from the top of the
heap in O(log(n)), so we get O(log(NumProcesses)*NumStandbyReplicas).

Note that the change in order changes the resulting standby assignments
(although this difference does not show up in the existing unit tests).
I would argue that the new order of assignment will actually yield
better assignments, since the assignment will be more sticky, which has
the potential to reduce the amount of store we have to restore from the
changelog topic after assingments.

In our worst-performing benchmark, this improves the runtime by ~107x.

Reviewers: Bill Bejeck<bbejeck@apache.org>
2025-09-03 17:13:01 +02:00
Lucas Brutschy f621a635c1
KAFKA-19570: Implement offline migration for streams groups (#20288)
Offline migration essentially preserves offsets and nothing else. So
effectively write tombstones for classic group type when a streams
heartbeat is sent to with the group ID of an empty classic group, and
write tombstones for the streams group type when a classic consumer
attempts to join with a group ID of an empty streams group.

Reviewers: Bill Bejeck <bbejeck@apache.org>, Sean Quah
 <squah@confluent.io>, Dongnuo Lyu <dlyu@confluent.io>
2025-08-26 10:05:30 +02:00
Mickael Maison fd9b5514ad
MINOR: Cleanups in coordinator-common/group-coordinator (#20097)
- Use `record` where possible
- Use enhanced `switch`
- Tweak a bunch of assertions

Reviewers: Yung <yungyung7654321@gmail.com>, TengYao Chi
 <frankvicky@apache.org>, Ken Huang <s7133700@gmail.com>, Dongnuo Lyu
 <dlyu@confluent.io>, PoAn Yang <payang@apache.org>
2025-07-31 16:34:08 +08:00
jimmy dd784e7d7a
KAFKA-16717 [3/N]: Add AdminClient.alterShareGroupOffsets (#19820)
[KAFKA-16717](https://issues.apache.org/jira/browse/KAFKA-16717) aims to
finish the AlterShareGroupOffsets for ShareGroupCommand part.

Reviewers: Lan Ding <isDing_L@163.com>, Chia-Ping Tsai
 <chia7712@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, Andrew Schofield
 <aschofield@confluent.io>
2025-07-29 11:47:24 +01:00
Chirag Wadhwa 1ae4173601
KAFKA-19499: Corrected the logger name in PersisterStateManagerHandler (#20175)
This PR corrects the name of logger in the inner class
PersisterStateManagerHandler. After this change it will be possible to
change the log level dynamically in the kafka brokers. This PR also
includes a small change in a log line in GroupCoordinatorService, making
it clearer.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Sushant Mahajan
<smahajan@confluent.io>, Lan Ding <isDing_L@163.com>, TengYao Chi
<frankvicky@apache.org>
2025-07-24 12:28:58 +01:00
Elizabeth Bennett f81853ca88
KAFKA-19441: encapsulate MetadataImage in GroupCoordinator/ShareCoordinator (#20061)
CI / build (push) Waiting to run Details
The MetadataImage has a lot of stuff in it and it gets passed around in
many places in the new GroupCoordinator. This makes it difficult to
understand what metadata the group coordinator actually relies on and
makes it too easy to use metadata in ways it wasn't meant to be used. 

This change encapsulate the MetadataImage in an interface
(`CoordinatorMetadataImage`) that indicates and controls what metadata
the group coordinator actually uses. Now it is much easier at a glance
to see what dependencies the GroupCoordinator has on the metadata. Also,
now we have a level of indirection that allows more flexibility in how
the GroupCoordinator is provided the metadata it needs.
2025-07-18 08:16:54 +08:00
Lucas Brutschy 29cf97b9ad
KAFKA-19478 [2/N]: Remove task pairs (#20127)
CI / build (push) Waiting to run Details
Task pairs is an optimization that is enabled in the current sticky task
assignor.

The basic idea is that every time we add a task A to a client that has
tasks B, C, we add pairs (A, B) and (A, C) to a global collection of
pairs. When adding a standby task, we then prioritize creating standby
tasks that create new task pairs. If this does not work, we fall back to
the original behavior.

The complexity of this optimization is fairly significant, and its
usefulness is questionable, the HighAvailabilityAssignor does not seem
to have such an optimization, and the absence of this optimization does
not seem to have caused any problems that I know of. I could not find
any what this optimization is actually trying to achieve.

A side effect of it is that we will sometimes avoid “small loops”, such
as

        Node A: ActiveTask1, StandbyTask2 Node B: ActiveTask2,
StandbyTask1                            Node C: ActiveTask3,
StandbyTask4                            Node D: ActiveTask4,
StandbyTask3

So a small loop like this, worst case losing two nodes will cause 2
tasks to go down, so the assignor is preferring

        Node A: ActiveTask1, StandbyTask4 Node B: ActiveTask2,
StandbyTask1                            Node C: ActiveTask3,
StandbyTask2                            Node D: ActiveTask4,
StandbyTask3

Which is a “big loop” assignment, where worst-case losing two nodes will
cause at most 1 task to be unavailable. However, this optimization seems
fairly niche, and also the current implementation does not seem to
implement it in a direct form, but a more relaxed constraint which
usually, does not always avoid small loops. So it remains unclear
whether  this is really the intention behind the optimization. The
current unit  tests of the StickyTaskAssignor pass even after removing
the  optimization.

The pairs optimization has a worst-case quadratic space and time
complexity in the number of tasks, and make a lot of other optimizations
impossible, so I’d suggest we remove it. I don’t think, in its current
form, it is suitable to be implemented in a broker-side assignor. Note,
however, if we identify a useful effect of the code in the future, we
can work on finding an efficient algorithm that can bring the
optimization to our broker-side assignor.

This reduces the runtime of our worst case benchmark by 10x.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
2025-07-14 09:13:51 +02:00
Sushant Mahajan 8aa5eae2f9
KAFKA-19457: Make share group init retry interval configurable. (#20104)
* While creating share group init requests  in
`GroupMetadataManager.shareGroupHeartbeat`,  we check for topics in
`initializing` state and if they are a certain amount of time old, we
issue retry requests for the same.
* The interval for considering initializing topics as old was based of
`offsetsCommitTimeoutMs` and was not configurable.
* In this PR, we remedy the situation by introducing a new config to
supply the value. The default is `30_000` which is a
heuristic based on the fact that the share coordinator `persister`
retries request with exponential backoff, with upper cap of `30_000`
seconds.
* Tests have been updated wherever applicable.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Lan Ding
 <isDing_L@163.com>, TaiJuWu <tjwu1217@gmail.com>, Andrew Schofield
 <aschofield@confluent.io>
2025-07-09 09:52:58 +01:00
Lucas Brutschy a88fd01e74
KAFKA-19478 [1/N]: Precompute values in ProcessState (#20120)
This is a very mechanical and obvious change that is making most
accessors in ProcessState constant time O(1), instead of linear time
O(n), by computing the collections and aggregations at insertion time,
instead of every time the value is accessed.

Since the accessors are used in deeply nested loops, this reduces the
runtime of our worst case benchmarks by ~14x.

Reviewers: Bill Bejeck <bbejeck@apache.org>
2025-07-08 13:32:47 +02:00
Andrew Schofield da4fbba279
KAFKA-19468: Ignore unsubscribed topics when computing share assignment (#20101)
When the group coordinator is processing a heartbeat from a share
consumer, it must decide whether the recompute the assignment. Part of
this decision hinges on whether the assigned partitions match the
partitions initialised by the share coordinator. However, when the set
of subscribed topics changes, there may be initialised partitions which
are not currently assigned. Topics which are not subscribed should be
omitted from the calculation about whether to recompute the assignment.

Co-authored-by: Sushant Mahajan <smahajan@confluent.io>

Reviewers: Lan Ding <53332773+DL1231@users.noreply.github.com>, Sushant
 Mahajan <smahajan@confluent.io>, Apoorv Mittal
 <apoorvmittal10@gmail.com>
2025-07-04 14:55:19 +01:00
Andrew Schofield 860853dba2
KAFKA-19363: Finalize heterogeneous simple share assignor (#20074)
CI / build (push) Waiting to run Details
Finalise the share group SimpleAssignor for heterogeneous subscriptions.
The assignor code is much more accurate about the number of partitions
assigned to each member, and the number of members assigned for each
partition. It eliminates the idea of hash-based assignment because that
has been shown to the unhelpful. The revised code is very much more
effective at assigning evenly as the number of members grows and shrinks
over time.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
2025-07-04 10:35:31 +01:00
Sushant Mahajan 268cf664c3
KAFKA-19454: Handle topics missing in metadata in share delete. (#20090)
* There are instances where share group delete calls in group
coordinator (`onPartitionsDelete`, `deleteShareGroups`) where we lookup
the metadata image to fetch the topic id/partitions/topic name for a
topic name/id. However, there have been
instances where the looked up info was not found due to cluster being
under load or the underlying topic being deleted and information not
propagated correctly.
* To remedy the same, this PR adds checks to determine that topic is
indeed present in the image before the lookups thus preventing NPEs. The
problematic situations are logged.
* New tests have been added for `GroupMetadataManger` and
`GroupCoordinatorService`.

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-07-03 11:19:24 +01:00
Andrew Schofield 729f9ccf06
KAFKA-19440: Handle top-level errors in AlterShareGroupOffsets RPC (#20049)
While testing the code in https://github.com/apache/kafka/pull/19820, it
became clear that the error handling problems were due to the underlying
Admin API. This PR fixes the error handling for top-level errors in the
AlterShareGroupOffsets RPC.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Lan Ding
 <isDing_L@163.com>, TaiJuWu <tjwu1217@gmail.com>
2025-07-03 11:00:56 +01:00
Sushant Mahajan 28c53ba09a
KAFKA-19453: Ignore group not found in share group record replay. (#20076)
CI / build (push) Waiting to run Details
* When a `ShareGroup*` record is replayed in group
metadata manager, there is a call to check if the group exists. If the
group does not exist - we are throwing an exception which is
unnecessary.
* In this PR, we have added check to ignore this exception.
* New test to validate the logic has been added.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Dongnuo Lyu
<139248811+dongnuo123@users.noreply.github.com>
2025-07-02 10:10:14 +01:00
David Jacot 202e216a60
MINOR: Cleanup hasMemberSubscriptionChanged (#20047)
CI / build (push) Waiting to run Details
Cleanup hasMemberSubscriptionChanged.
Remove unused InvalidRegularExpressionException from the signature.

Reviewers: Sean Quah <squah@confluent.io>, Chia-Ping Tsai
 <chia7712@gmail.com>, TengYao Chi <frankvicky@apache.org>
2025-06-27 14:16:49 +08:00
David Jacot f6a78c4c2b
KAFKA-19246; OffsetFetch API does not return group level errors correctly with version 1 (#19704)
The OffsetFetch API does not support top level errors in version 1.
Hence, the top level error must be returned at the partition level.

Side note: It is a tad annoying that we create error response in
multiple places (e.g. KafkaApis, Group CoordinatorService). There were a
reason for this but I cannot remember.

Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Sean Quah <squah@confluent.io>, Ken Huang <s7133700@gmail.com>, TengYao Chi <frankvicky@apache.org>
2025-06-26 06:29:43 -07:00
Lucas Brutschy 23ddb1d8ac
MINOR: Reject requests using unsupported features in KIP-1071 (#20031)
CI / build (push) Waiting to run Details
KIP-1071 does not currently support all features planned in the KIP. We
should reject any requests that are using features that are currently
not implemented.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Matthias J. Sax
 <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
2025-06-25 14:48:56 +02:00
Rajini Sivaram 33a1648c44
MINOR: Fix response for consumer group describe with empty group id (#20030)
ConsumerGroupDescribe with an empty group id returns a response containing `null` groupId in a non-nullable field. Since the response cannot be serialized, this results in UNKNOWN_SERVER_ERROR being returned to the client. This PR sets the group id in the response to an empty string instead and adds request tests for empty group id.

Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2025-06-25 10:33:44 +01:00
Sushant Mahajan 815dd93e2f
MINOR: Invoke share group rebalance sensor. (#20006)
* The share group rebalance metric was not being invoked at the
appropriate group id bump position.
* This PR solves the issue.
* The metric name has been updated
(s/rebalance-rate/share-group-rebalance-rate,
s/rebalance-count/share-group-rebalance-count/)
* Updated tests in `GroupMetadataManagerTest` and
`GroupCoordinatorMetricsTest`

Reviewers: Andrew Schofield <aschofield@confluent.io>
2025-06-21 08:35:19 +01:00
Andrew Schofield 4690527fab
KAFKA-19362: Finalize homogeneous simple share assignor (#19977)
Finalise the share group SimpleAssignor for homogeneous subscriptions.
The assignor code is much more accurate about the number of partitions
assigned to each member, and the number of members assigned for each
partition. It eliminates the idea of hash-based assignment because that
has been shown to the unhelpful. The revised code is very much more
effective at assigning evenly as the number of members grows and shrinks
over time.

A future PR will address the code for heterogeneous subscriptions.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
2025-06-20 16:10:47 +01:00
Dongnuo Lyu af012e1ec2
KAFKA-18961: Time-based refresh for server-side RE2J regex (#19904)
Consumers can subscribe to an RE2J SubscriptionPattern that will be
resolved and maintained on the server-side (KIP-848). Currently, those
regexes are refreshed on the coordinator when a consumer subscribes to a
new regex, or if there is a new topic metadata image (to ensure regex
resolution stays up-to-date with existing topics)

But with
[KAFKA-18813](https://issues.apache.org/jira/browse/KAFKA-18813), the
topics matching a regex are filtered based on ACLs. This generates a new
situation, as regexes resolution do not stay up-to-date as topics become
visible (ACLs added/delete).

This patch introduces time-based refresh for the subscribed regex by
- Adding internal `group.consumer.regex.batch.refresh.max.interval.ms`
config
that controls the refresh interval.
- Schedule a regex refresh when updating regex subscription if the
latest refresh is older than the max interval.

Reviewers: David Jacot <djacot@confluent.io>
2025-06-12 04:54:39 -07:00
Jhen-Yung Hsu 2e968560e0
MINOR: Cleanup simplify set initialization with Set.of (#19925)
Simplify Set initialization and reduce the overhead of creating extra
collections.

The changes mostly include:
- new HashSet<>(List.of(...))
- new HashSet<>(Arrays.asList(...)) / new HashSet<>(asList(...))
- new HashSet<>(Collections.singletonList()) / new
HashSet<>(singletonList())
- new HashSet<>(Collections.emptyList())
- new HashSet<>(Set.of())

This change takes the following into account, and we will not change to
Set.of in these scenarios:
- Require `mutability` (UnsupportedOperationException).
- Allow `duplicate` elements (IllegalArgumentException).
- Allow `null` elements (NullPointerException).
- Depend on `Ordering`. `Set.of` does not guarantee order, so it could
make tests flaky or break public interfaces.

Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
 <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
2025-06-11 18:36:14 +08:00
PoAn Yang 844b0e651b
KAFKA-19369: Add group.share.assignors config and integration test (#19900)
CI / build (push) Waiting to run Details
* Add `group.share.assignors` config to `GroupCoordinatorConfig`.
* Send `rackId` in share group heartbeat request if it's not null.
* Add integration test `testShareConsumerWithRackAwareAssignor`.

Reviewers: Lan Ding <53332773+DL1231@users.noreply.github.com>, Andrew
 Schofield <aschofield@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
2025-06-06 14:20:56 +01:00
PoAn Yang e0adec5549
KAFKA-19290: Exploit mapKey optimisation in protocol requests and responses (wip) (#19815)
The mapKey optimisation can be used in some KIP-932 RPC schemas to
improve efficiency of some key-based accesses.

* AlterShareGroupOffsetsResponse
* ShareFetchRequest
* ShareFetchResponse
* ShareAcknowledgeRequest
* ShareAcknowledgeResponse

Reviewers: Andrew Schofield <aschofield@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
2025-06-06 14:19:08 +01:00
Lucas Brutschy 25bc5f2cfa
KAFKA-19372: StreamsGroup not subscribed to a topic when empty (#19901)
We should behave more like a consumer group and make sure to not be
subscribed to the input topics anymore when the last member leaves the
group. We don't do this right now because our topology is still
initialized even after the last member leaves the group.

This will allow:
* Offsets to expire and be cleaned up.
* Offsets to be deleted through admin API calls.

Reviewers: Bill Bejeck <bbejeck@apache.org>
2025-06-04 20:55:14 +02:00
Lucas Brutschy 678d456ad7
KAFKA-19044: Handle tasks that are not present in the current topology (#19722)
A heartbeat might be sent to the group coordinator, claiming to own
tasks that we do  not know about. We need some logic to handle those
requests. In KIP-1071, we propose  to return `INVALID_REQUEST` error
whenever this happens, effectively letting the  clients crash.

This behavior will, however, make topology updates impossible. Bruno
Cadonna proposed  to only check that owned tasks match our set of
expected tasks if the topology epochs  between the group and the client
match. The aim of this change is to implement a  check and a behavior
for the first version of the protocol, which is to always  return
`INVALID_REQUEST` if an unknown task is sent to the group coordinator.

We can relax this constraint once we allow topology updating with
topology epochs.

To efficiently check this whenever we receive a heartbeat containing
tasks, we precompute the number of tasks for each subtopology. This also
benefits the performance of the assignor.

Reviewers: Bill Bejeck <bbejeck@apache.org>
2025-06-04 20:22:52 +02:00
PoAn Yang 2977cb17d0
KAFKA-17747: [6/N] Replace subscription metadata with metadata hash in share group (#19796)
* Use metadata hash to replace subscription metadata.
* Remove `ShareGroupPartitionMetadataKey` and
`ShareGroupPartitionMetadataValue`.
* Use `subscriptionTopicNames` and `metadataImage` to replace
`subscriptionMetadata` in `subscribedTopicsChangeMap` function.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot
<djacot@confluent.io>, Andrew Schofield <aschofield@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
2025-06-03 16:30:39 +01:00
Andrew Schofield 016a4a6c4c
KAFKA-19353: Upgrade note and initial docs for KIP-932 (#19863)
CI / build (push) Waiting to run Details
This is the initial documentation for KIP-932 preview in AK 4.1. The aim
is to get very minimal docs in before the cutoff. Longer term, more
comprehensive documentation will be provided for AK 4.2.

The PR includes:
* Generation of group-level configuration documentation
* Add link to KafkaShareConsumer to API docs
* Add a summary of share group rational to design docs
* Add basic operations information for share groups to ops docs
* Add upgrade note describing arrival of KIP-932 preview in 4.1

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>

---------

Co-authored-by: Apoorv Mittal <apoorvmittal10@gmail.com>
2025-06-03 13:23:11 +01:00
PoAn Yang 425f028556
KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in stream group (#19802)
* Use metadata hash to replace subscription metadata.
* Remove `StreamsGroupPartitionMetadataKey` and
`StreamsGroupPartitionMetadataValue`.
* Check whether `configuredTopology` is empty. If it's, call
`InternalTopicManager.configureTopics` and set the result to the group.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
2025-06-03 13:21:34 +02:00
Apoorv Mittal a70a667e95
MINOR: Fixing logs and adding exception in response (#19859)
PR streamlines the logs when delete share group or offset is triggered.
Also fixes the response when group is not found while deleting share
group.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Sushant Mahajan
 <smahajan@confluent.io>
2025-06-02 15:10:51 +01:00
Sean Quah 8323168b57
MINOR: Minor updates to RangeSet (#19678)
Minor updates to RangeSet:   * Disallow ranges with negative size   *
Disallow ranges with more than Integer.MAX_VALUE elements   * Fix
equals() so that all empty RangeSets are equal, to follow the Set
interface definition better.   * Reimplement hashCode() to follow the
Set interface definition.

Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
2025-05-31 02:31:51 +08:00
Sushant Mahajan 383a9ff9df
KAFKA-19344: Replace desc.assignablePartitions with spec.isPartitionAssignable. (#19838)
CI / build (push) Waiting to run Details
- A new method `assignablePartitions` was added to the
`SubscribedTopicDescriber`in https://github.com/apache/kafka/pull/19026.
This method was required for computing assignments for share groups
(KIP-932).
- However, since the describer is a public interface and is used to
encapsulate methods which return all subscribed partitions (KIP-848),
`assignablePartitions` is deemed inconsistent with this interface.
- Hence, this PR extends the `GroupSpec` interface to add a method
`isPartitionAssignable` which will serve the same purpose. The
`assignablePartitions` has been removed from the describer.
- Tests have been updated for the assigners and spec and removed from
describer as required.

Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot
 <djacot@confluent.io>
2025-05-28 20:27:29 +01:00
PoAn Yang d6ee83a893
KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group (#19761)
* Add `topicHashCache` to `GroupMetadataManager`.
* Remove topic hash from cache if related topic image is updated.
* Ignore topic hash 0 when calculating group metadata hash.
* Add `metadataHash` to `ModernGroup`.
* Replace subscription metadata with metadata hash.
* If there is data in `ConsumerGroupPartitionMetadataValue`, set a flag
in group to add tombstone record in next group heartbeat.

Reviewers: David Jacot <djacot@confluent.io>
2025-05-28 05:56:46 -07:00
Alieh Saeedi a3d5ca07f8
MINOR: Change `Streams group` to `streams group` (#19813)
As of https://github.com/apache/kafka/pull/19758#discussion_r2097734386,
the capitalization across all messages are aligned.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Andrew Schofield
 <aschofield@confluent.io>
2025-05-28 09:56:29 +02:00
David Jacot 25031373da
KAFKA-19154; Offset Fetch API should return INVALID_OFFSET if requested topic id does not match persisted one (#19744)
CI / build (push) Waiting to run Details
This patch updates the OffsetFetch API to ensure that a committed offset
is returned iff the requested topic id matches the persisted one; the
invalid offset is returned otherwise.

Reviewers: Lianet Magrans <lmagrans@confluent.io>
2025-05-27 16:15:06 +02:00
Dongnuo Lyu fcb722dc88
KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group (#19790)
CI / build (push) Waiting to run Details
When a consumer protocol static member replaces an existing member in a
classic group, it's not necessary to recompute the assignment. However,
it happens anyway.

In

[ConsumerGroup.fromClassicGroup](0ff4dafb7d/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java (L1140)),
we don't set the group's subscriptionMetadata.  Later in the consumer
group heartbeat, we [call

updateSubscriptionMetadata](0ff4dafb7d/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java (L1748)),
which [notices that the group's subscriptionMetadata needs an

update](0ff4dafb7d/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java (L2757))
and bumps the epoch. Since the epoch is bumped, we [recompute the

assignment](0ff4dafb7d/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java (L1766)).

As a fix, this patch sets the subscriptionMetadata in
ConsumerGroup.fromClassicGroup.

Reviewers: Sean Quah <squah@confluent.io>, David Jacot <djacot@confluent.io>
2025-05-27 02:25:57 -07:00
Jhen-Yung Hsu 651f86b77e
MINOR: Remove unused mkMapOfPartitionRacks method (#19797)
The mkMapOfPartitionRacks in ServerSideAssignorBenchmark.java was
introduced in 8013657f5d,  and the one in
GroupCoordinatorRecordHelpersTest.java was introduced in
3709901c9e.

Both have not been used since bb97d63d41.

Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
2025-05-26 02:54:17 +08:00
Sushant Mahajan af4d048da6
MINOR: Bugfix in GroupMetadataManager.testShareGroupInitializeSuccess. (#19795)
Currently, we were asserting on records containing set using
`assertEquals` which can fail intermittently. To fix the assertion has
been replaced by `assertRecordEquals`.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai
 <chia7712@gmail.com>
2025-05-23 23:54:38 +08:00
Ming-Yen Chung e107e69a51
HOTFIX: Fix GroupMetadataManager#completeAlterShareGroupOffsets to use InitMapValue in addInitializingTopicsRecords (#19792)
https://github.com/apache/kafka/pull/19781/files#diff-00f0f81cf13e66781777d94f7d2e68a581663385c37e98792507f2294c91bb09L2746-R2745
changes the `addInitializingTopicsRecords` signature while
https://github.com/apache/kafka/pull/18929/files#r2104172356 didn't make
adjustment accordingly.

Fix GroupMetadataManager#completeAlterShareGroupOffsets to use
`InitMapValue` in `initializingTopics` so  that
`addInitializingTopicsRecords` can accept it.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Sushant Mahajan <sushant.mahajan88@gmail.com>, Kuan-Po Tseng <brandboat@gmail.com>, TengYao Chi <frankvicky@apache.org>, Shivsundar R <shr@confluent.io>, PoAn Yang <payang@apache.org>, Nick Guo <lansg0504@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>
2025-05-23 13:07:37 +01:00
jimmy b44bfca408
KAFKA-16717 [2/N]: Add AdminClient.alterShareGroupOffsets (#18929)
[KAFKA-16720](https://issues.apache.org/jira/browse/KAFKA-16720) aims to
finish the AlterShareGroupOffsets RPC.

Reviewers: Andrew Schofield <aschofield@confluent.io>

---------

Co-authored-by: jimmy <wangzhiwang@qq.com>
2025-05-23 09:05:48 +01:00