Commit Graph

442 Commits

Author SHA1 Message Date
Apoorv Mittal 96816a3ed7
KAFKA-16742: Add share group describe in coordinator (KIP-932) (#16797)
Share group describe functionality for KIP-932

Reviewers:  Andrew Schofield <aschofield@confluent.io>,  Manikumar Reddy <manikumar.reddy@gmail.com>
2024-08-07 14:53:01 +05:30
Chirag Wadhwa 1db84c1a11
KAFKA-16745: Implemented handleShareFetchRequest RPC including unit tests (#16456)
Implemented handleShareFetch request RPC in KafkaApis.scala. This method is called whenever the client sends a Share Fetch request to the broker. Although Share Fetch request support acknowledgements, since the logic for acknowledging records is not completely implemented in SharePartitionManager.java class, this method currently includes placeholder code for acknowledging, which will be replaced by the actual functionality in the upcoming PRs.

Reviewers:  Apoorv Mittal <apoorvmittal10@gmail.com>, Abhinav Dixit <adixit@confluent.io>, Jun Rao <junrao@gmail.com>
2024-08-06 07:59:04 -07:00
Apoorv Mittal 0b6086ed88
KAFKA-16741: Add ShareGroupHeartbeat API support - 2/N (KIP-932) (#16573)
ShareGroupHeartbeat API support as defined in KIP-932. The heartbeat persists Group and Member information on __consumer_offsets topic.

The PR also moves some of the ShareGroupConfigs to GroupCoordinatorConfigs as they should only be used in group coordinator.


Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
2024-07-15 16:14:55 +05:30
Kuan-Po Tseng a533e246e3
KAFKA-17081 Tweak GroupCoordinatorConfig: re-introduce local attributes and validation (#16524)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-07-08 01:15:18 +08:00
TaiJuWu 72a47cc07b
Fix compilation error (#16526)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Chris Egerton <fearthecellos@gmail.com>
2024-07-04 16:48:55 -04:00
Ritika Reddy 42f267a853
KAFKA-16944; Rewrite Range Assignor (#16504)
The server side range assignor was made to be sticky i.e. partitions from the existing assignment are retained as much as possible. During a rebalance, the expected behavior is to achieve co-partitioning for members that are subscribed to the same set of topics with equal number of partitions.

However, there are cases where this cannot be achieved efficiently with the current algorithm. There is no easy way to implement stickiness and co-partitioning and hence we have resorted to recomputing the target assignment every time.

In case of static membership, instanceIds are leveraged to ensure some form of stickiness.

```
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt    Score    Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10         HOMOGENEOUS           100  avgt    5    0.052 ±  0.001  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10         HOMOGENEOUS          1000  avgt    5    0.454 ±  0.003  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10         HOMOGENEOUS           100  avgt    5    0.476 ±  0.046  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10         HOMOGENEOUS          1000  avgt    5    3.102 ±  0.055  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10         HOMOGENEOUS           100  avgt    5    5.640 ±  0.223  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10         HOMOGENEOUS          1000  avgt    5   37.947 ±  1.000  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10       HETEROGENEOUS           100  avgt    5    0.172 ±  0.001  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10       HETEROGENEOUS          1000  avgt    5    1.882 ±  0.006  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10       HETEROGENEOUS           100  avgt    5    1.730 ±  0.036  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10       HETEROGENEOUS          1000  avgt    5   17.654 ±  1.160  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10       HETEROGENEOUS           100  avgt    5   18.595 ±  0.316  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10       HETEROGENEOUS          1000  avgt    5  172.398 ±  2.251  ms/op
JMH benchmarks done

Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt   Score   Error  Units
TargetAssignmentBuilderBenchmark.build                  100                         10           100  avgt    5   0.071 ± 0.004  ms/op
TargetAssignmentBuilderBenchmark.build                  100                         10          1000  avgt    5   0.428 ± 0.026  ms/op
TargetAssignmentBuilderBenchmark.build                 1000                         10           100  avgt    5   0.659 ± 0.028  ms/op
TargetAssignmentBuilderBenchmark.build                 1000                         10          1000  avgt    5   3.346 ± 0.102  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5   8.947 ± 0.386  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  40.240 ± 3.113  ms/op
JMH benchmarks done
```

Reviewers: David Jacot <djacot@confluent.io>
2024-07-04 10:33:09 -07:00
Apoorv Mittal e0dcfa7b51
KAFKA-16741: Add share group classes for Heartbeat API (1/N) (KIP-932) (#16516)
Defined share group, member and sinmple assignor classes with API definition for Share Group Heartbeat and Describe API.

The ShareGroup and ShareGroupMember extends the common ModernGroup and ModernGroupMember respectively.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
2024-07-04 20:31:47 +05:30
David Jacot efe7ccaf77
KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes (#16498)
The group coordinator has (internal) write operations that could generate a large number of records (e.g. expiring offsets and groups). At the moment, those operations are limited by the maximum message size. If they hit it, they are basically stuck forever. This patch extends the CoordinatorRuntime to support non-atomic writes and it changes those internal operations to be non-atomic.

Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2024-07-04 04:51:08 -07:00
Apoorv Mittal f2dbc55d24
KAFKA-17047: Refactored group coordinator classes to modern package (KIP-932) (#16474)
Following the discussion and suggestion by @dajac, https://github.com/apache/kafka/pull/16054#discussion_r1613638293, the PR refactors the common classes to build TargetAssignment in `modern` package. `consumer` package has been moved inside `modern` package with classes exclusive to `consumer group`.

This PR completes the refactoring and base to introduce `share` package inside `modern`. The subsequent PRs will define the implementation specific to Share Groups while re-using the common functionality from `modern` package classes. 

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
2024-07-03 00:16:40 -07:00
Kuan-Po (Cooper) Tseng 206d0f809a
KAFKA-16909 Refactor GroupCoordinatorConfig with AbstractConfig (#16458)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-07-01 23:31:53 +08:00
David Jacot 9a78122fb0
MINOR: Refactor GroupMetadataManager#consumerGroupHeartbeat and GroupMetadataManager#classicGroupJoinToConsumerGroup (#16371)
This patch is an attempt to simplifying GroupMetadataManager#consumerGroupHeartbeat and GroupMetadataManager#classicGroupJoinToConsumerGroup by sharing more of the common logic. It slightly change how static members are replaced too. Now, we generate the records to replace the member and then we update the member if needed.

Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Justine Olshan <jolshan@confluent.io>
2024-06-30 23:16:52 -07:00
Apoorv Mittal 60114a46a7
KAFKA-16822: Abstract consumer group to share functionality with share group (KIP-932) (#16054)
Abstracted code for 2 classes `ConsumerGroup` and `ConsumerGroupMember` to `ModernGroup` and `ModernGroupMember` respectively. The new abstract classes are created to share common functionality with `ShareGroup` and `ShareGroupMember` which are being introduced with KIP-932.

The patch is majorly code refactoring from existing classes to abstract classes. Also created a new package called `modern` where `MemberState` class is moved, in upcoming patches, I will move common classes for `Share` and `Consumer` Group in `modern` package itself. 

Reviewers: Lianet Magrans <lianetmr@gmail.com>, Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
2024-06-27 05:42:58 -07:00
David Jacot ee550c4b77
KAFKA-16973; Fix caught-up condition (#16367)
When a write operation does not have any records, the coordinator runtime checked whether the state machine is caught-up to decide whether the operation should wait until the state machine is committed up to the operation point or the operation should be completed. The current implementation assumes that there will always be a pending write operation waiting in the deferred queue when the state machine is not fully caught-up yet. This is true except when the state machine is just loaded and not caught-up yet.

This patch fixes the issue by always comparing the last written offset and the last committed offset.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-06-20 00:53:48 -07:00
Dongnuo Lyu 21d60eabab
KAFKA-16673; Simplify `GroupMetadataManager#toTopicPartitions` by using `ConsumerProtocolSubscription` instead of `ConsumerPartitionAssignor.Subscription` (#16309)
In `GroupMetadataManager#toTopicPartitions`, we generate a list of `ConsumerGroupHeartbeatRequestData.TopicPartitions` from the input deserialized subscription. Currently the input subscription is `ConsumerPartitionAssignor.Subscription`, where the topic partitions are stored as (topic-partition) pairs, whereas in `ConsumerGroupHeartbeatRequestData.TopicPartitions`, we need the topic partitions to be stored as (topic-partition list) pairs.

`ConsumerProtocolSubscription` is an intermediate data structure in the deserialization where the topic partitions are stored as (topic-partition list) pairs. This pr uses `ConsumerProtocolSubscription` instead as the input subscription to make `toTopicPartitions` more efficient. 

Reviewers: David Jacot <djacot@confluent.io>
2024-06-17 02:47:52 -07:00
Omnia Ibrahim e99da2446c
KAFKA-15853: Move KafkaConfig.configDef out of core (#16116)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-06-14 17:26:00 +02:00
gongxuanzhang 6d9ef0e12a
KAFKA-10787 Apply spotless to `group-coordinator` and `group-coordinator-api` (#16298)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-06-14 12:46:28 +08:00
Dongnuo Lyu 11c85a93c3
MINOR: Make online downgrade failure logs less noisy and update the timeouts scheduled in `convertToConsumerGroup` (#16290)
This patch: 
- changes the order of the checks in `validateOnlineDowngrade`, so that only when the last member using the consumer protocol leave and the group still has classic member(s), `online downgrade is disabled` is logged if the policy doesn't allow downgrade.
- changes the session timeout in `convertToConsumerGroup` from `consumerGroupSessionTimeoutMs` to `member.classicProtocolSessionTimeout().get()`.

Reviewers: David Jacot <djacot@confluent.io>
2024-06-13 02:11:01 -07:00
gongxuanzhang 596b945072
KAFKA-16643 Add ModifierOrder checkstyle rule (#15890)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-06-13 15:39:32 +08:00
David Jacot 638844f833
KAFKA-16770; [2/2] Coalesce records into bigger batches (#16215)
This patch is the continuation of https://github.com/apache/kafka/pull/15964. It introduces the records coalescing to the CoordinatorRuntime. It also introduces a new configuration `group.coordinator.append.linger.ms` which allows administrators to chose the linger time or disable it with zero. The new configuration defaults to 10ms.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2024-06-11 23:29:50 -07:00
David Jacot 98f7da9172
KAFKA-16930; UniformHeterogeneousAssignmentBuilder throws NPE when one member has no subscriptions (#16283)
Fix the following NPE:

```
java.lang.NullPointerException: Cannot invoke "org.apache.kafka.coordinator.group.assignor.MemberAssignment.targetPartitions()" because the return value of "java.util.Map.get(Object)" is null
	at org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.canMemberParticipateInReassignment(GeneralUniformAssignmentBuilder.java:248)
	at org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.balance(GeneralUniformAssignmentBuilder.java:336)
	at org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.buildAssignment(GeneralUniformAssignmentBuilder.java:157)
	at org.apache.kafka.coordinator.group.assignor.UniformAssignor.assign(UniformAssignor.java:84)
	at org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.build(TargetAssignmentBuilder.java:302)
	at org.apache.kafka.coordinator.group.GroupMetadataManager.updateTargetAssignment(GroupMetadataManager.java:1913)
	at org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupHeartbeat(GroupMetadataManager.java:1518)
	at org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupHeartbeat(GroupMetadataManager.java:2254)
	at org.apache.kafka.coordinator.group.GroupCoordinatorShard.consumerGroupHeartbeat(GroupCoordinatorShard.java:308)
	at org.apache.kafka.coordinator.group.GroupCoordinatorService.lambda$consumerGroupHeartbeat$0(GroupCoordinatorService.java:298)
	at org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime$CoordinatorWriteEvent.lambda$run$0(CoordinatorRuntime.java:769)
	at org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.withActiveContextOrThrow(CoordinatorRuntime.java:1582)
	at org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.access$1400(CoordinatorRuntime.java:96)
	at org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime$CoordinatorWriteEvent.run(CoordinatorRuntime.java:767)
	at org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor$EventProcessorThread.handleEvents(MultiThreadedEventProcessor.java:144)
	at org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor$EventProcessorThread.run(MultiThreadedEventProcessor.java:176) 
```

Reviewers: Lianet Magrans <lianetmr@gmail.com>, Justine Olshan <jolshan@confluent.io>
2024-06-11 11:43:56 -07:00
David Jacot 049cfeac02
MINOR: Rename uniform assignor's internal builders (#16233)
This patch renames the uniform assignor's builders to match the `SubscriptionType` which is used to determine which one is called. It removes the abstract class `AbstractUniformAssignmentBuilder` which is not necessary anymore. It also applies minor refactoring.

Reviewers: Ritika Reddy <rreddy@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-06-10 05:26:56 -07:00
David Jacot 7d832cf74f
KAFKA-14701; Move `PartitionAssignor` to new `group-coordinator-api` module (#16198)
This patch moves the `PartitionAssignor` interface and all the related classes to a newly created `group-coordinator/api` module, following the pattern used by the storage and tools modules.

Reviewers: Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-06-06 12:19:20 -07:00
Dongnuo Lyu 7ddfa64759
MINOR: Adjust validateOffsetCommit/Fetch in ConsumerGroup to ensure compatibility with classic protocol members (#16145)
During online migration, there could be ConsumerGroup that has members that uses the classic protocol. In the current implementation, `STALE_MEMBER_EPOCH` could be thrown in ConsumerGroup offset fetch/commit validation but it's not supported by the classic protocol. Thus this patch changed `ConsumerGroup#validateOffsetCommit` and `ConsumerGroup#validateOffsetFetch` to ensure compatibility.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
2024-06-04 23:08:38 -07:00
Ritika Reddy 078dd9a311
KAFKA-16821; Member Subscription Spec Interface (#16068)
This patch reworks the `PartitionAssignor` interface to use interfaces instead of POJOs. It mainly introduces the `MemberSubscriptionSpec` interface that represents a member subscription and changes the `GroupSpec` interfaces to expose the subscriptions and the assignments via different methods.

The patch does not change the performance.

before:
```
Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt  Score   Error  Units
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5  3.462 ± 0.687  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  3.626 ± 0.412  ms/op
JMH benchmarks done
```

after:
```
Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt  Score   Error  Units
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5  3.677 ± 0.683  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  3.991 ± 0.065  ms/op
JMH benchmarks done
```

Reviewers: David Jacot <djacot@confluent.io>
2024-06-04 06:44:37 -07:00
David Jacot 7d82f7625e
MINOR: Log time taken to compute the target assignment (#16185)
The time taken to compute a new assignment is critical. This patches extending the existing logging to log it too. This is very useful information to have.

Reviewers: Luke Chen <showuon@gmail.com>
2024-06-04 06:38:56 -07:00
Jeff Kim d7bc43ed06
KAFKA-16664; Re-add EventAccumulator.poll(long, TimeUnit) (#16144)
We have revamped the thread idle ratio metric in https://github.com/apache/kafka/pull/15835. https://github.com/apache/kafka/pull/15835#discussion_r1588068337 describes a case where the metric loses accuracy and in order to set a lower bound to the accuracy, this patch re-adds a poll with a timeout that was removed as part of https://github.com/apache/kafka/pull/15430.

Reviewers: David Jacot <djacot@confluent.io>
2024-06-03 23:27:35 -07:00
TingIāu "Ting" Kì 7973aa6a39
KAFKA-16861: Don't convert to group to classic if the size is larger than group max size. (#16163)
Fix the bug where the group downgrade to a classic one when a member leaves, even though the consumer group size is still larger than `classicGroupMaxSize`.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
2024-06-03 11:36:07 -07:00
David Jacot 979f8d9aa3
MINOR: Small refactor in TargetAssignmentBuilder (#16174)
This patch is a small refactoring which mainly aims at avoid to construct a copy of the new target assignment in the TargetAssignmentBuilder because the copy is not used by the caller. The change relies on the exiting tests and it does not really have an impact on performance (e.g. validated with TargetAssignmentBuilderBenchmark).

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-06-03 11:32:39 -07:00
David Jacot fb566e48bf
KAFKA-16864; Optimize uniform (homogenous) assignor (#16088)
This patch optimizes uniform (homogenous) assignor by avoiding creating a copy of all the assignments. Instead, the assignor creates a copy only if the assignment is updated. It is a sort of copy-on-write. This change reduces the overhead of the TargetAssignmentBuilder when ran with the uniform (homogenous) assignor.

Trunk:

```
Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt   Score   Error  Units
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5  24.535 ± 1.583  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  24.094 ± 0.223  ms/op
JMH benchmarks done
```

```
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt   Score   Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS           100  avgt    5  14.697 ± 0.133  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS          1000  avgt    5  15.073 ± 0.135  ms/op
JMH benchmarks done
```

Patch:

```
Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt  Score   Error  Units
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5  3.376 ± 0.577  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  3.731 ± 0.359  ms/op
JMH benchmarks done
```

```
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt  Score   Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS           100  avgt    5  1.975 ± 0.086  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS          1000  avgt    5  2.026 ± 0.190  ms/op
JMH benchmarks done
```

Reviewers: Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2024-05-31 13:17:59 -07:00
Andrew Schofield 2d9994e0de
KAFKA-16722: Introduce ConsumerGroupPartitionAssignor interface (#15998)
KIP-932 introduces share groups to go alongside consumer groups. Both kinds of group use server-side assignors but it is unlikely that a single assignor class would be suitable for both. As a result, the KIP introduces specific interfaces for consumer group and share group partition assignors.

This PR introduces only the consumer group interface, `o.a.k.coordinator.group.assignor.ConsumerGroupPartitionAssignor`. The share group interface will come in a later release. The existing implementations of the general `PartitionAssignor` interface have been changed to implement `ConsumerGroupPartitionAssignor` instead and all other code changes are just propagating the change throughout the codebase.

Note that the code in the group coordinator that actually calculates assignments uses the general `PartitionAssignor` interface so that it can be used with both kinds of group, even though the assignors themselves are specific.

Reviewers: Apoorv Mittal <amittal@confluent.io>, David Jacot <djacot@confluent.io>
2024-05-29 08:31:52 -07:00
Dongnuo Lyu eefd114c4a
KAFKA-16832; LeaveGroup API for upgrading ConsumerGroup (#16057)
This patch implements the LeaveGroup API to the consumer groups that are in the mixed mode.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
2024-05-28 23:21:30 -07:00
Ritika Reddy a8d166c00e
KAFKA-16625; Reverse lookup map from topic partitions to members (#15974)
This patch speeds up the computation of the unassigned partitions by exposing the inverted target assignment. It allows the assignor to check whether a partition is assigned or not.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
2024-05-25 09:06:15 -07:00
Jeff Kim d585a494a4
KAFKA-16831: CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit (#16059)
CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit. Otherwise, we default the write limit to the min buffer size of 16384 for the write limit. This causes the coordinator to threw RecordTooLargeException even when it's under the 1MB max batch size limit.

Reviewers: David Jacot <djacot@confluent.io>
2024-05-24 13:33:57 -07:00
Jeff Kim 520aa8665c
KAFKA-16626; Lazily convert subscribed topic names to topic ids (#15970)
This patch aims to remove the data structure that stores the conversion from topic names to topic ids which was taking time similar to the actual assignment computation. Instead, we reuse the already existing ConsumerGroupMember.subscribedTopicNames() and do the conversion to topic ids when the iterator is requested.

Reviewers: David Jacot <djacot@confluent.io>
2024-05-24 00:51:50 -07:00
Dongnuo Lyu 14b5c4d1e8
KAFKA-16793; Heartbeat API for upgrading ConsumerGroup (#15988)
This patch implements the heartbeat api to the members that use the classic protocol in a ConsumerGroup.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
2024-05-22 23:27:00 -07:00
Jeff Kim e692feed34
MINOR: fix flaky testRecordThreadIdleRatio (#15987)
DelayEventAccumulator should return immediately if there are no events in the queue. Also removed some unused fields inside EventProcessorThread.

Reviewers: Gaurav Narula <gaurav_narula2@apple.com>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
2024-05-22 23:24:23 -07:00
Mickael Maison affe8da54c
KAFKA-7632: Support Compression Levels (KIP-390) (#15516)
Reviewers: Jun Rao <jun@confluent.io>,  Luke Chen <showuon@gmail.com>
Co-authored-by: Lee Dongjin <dongjin@apache.org>
2024-05-21 17:58:49 +02:00
David Jacot b4c2d66801
KAFKA-16770; [1/N] Coalesce records into bigger batches (#15964)
We have discovered during large scale performance tests that the current write path of the new coordinator does not scale well. The issue is that each write operation writes synchronously from the coordinator threads. Coalescing records into bigger batches helps drastically because it amortizes the cost of writes. Aligning the batches with the snapshots of the timelines data structures also reduces the number of in-flight snapshots.

This patch is the first of a series of patches that will bring records coalescing into the coordinator runtime. As a first step, we had to rework the PartitionWriter interface and move the logic to build MemoryRecords from it to the CoordinatorRuntime. The main changes are in these two classes. The others are related mechanical changes.

Reviewers: Justine Olshan <jolshan@confluent.io>
2024-05-20 23:47:09 -07:00
David Jacot 5b34574e86
MINOR: Refactor write timeout in CoordinatorRuntime (#15976)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-05-18 01:47:44 +08:00
Dongnuo Lyu b8c96389b4
KAFKA-16762: SyncGroup API for upgrading ConsumerGroup (#15954)
This patch implements the sync group api for the consumer groups that are in the mixed mode. In classicGroupSyncToConsumerGroup, the assignedPartitions calculated in the JoinGroup will be returned as the assignment in the sync response and the member session timeout will be rescheduled.

Reviewers: David Jacot <djacot@confluent.io>
2024-05-17 07:12:40 -07:00
David Jacot ffb31e172a
MINOR: Remove usage of Stream API in CoordinatorRecordHelpers (#15969)
This patch removes the usage of the Stream API in CoordinatorRecordHelpers. I saw it in a couple of profiles so it is better to remove it.

Reviewers: Justine Olshan <jolshan@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-05-15 23:13:55 -07:00
David Jacot bf88013a28
MINOR: Rename `Record` to `CoordinatorRecord` (#15949)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-05-15 13:57:19 +08:00
Dongnuo Lyu 0e023e1f73
MINOR: Add classic member session timeout to ClassicMemberMetadata (#15921)
The heartbeat api to the consumer group with classic protocol members schedules the session timeout. At present, there's no way to get the classic member session timeout in heartbeat to consumer group.

This patch stores the session timeout into the ClassicMemberMetadata in ConsumerGroupMemberMetadataValue and update it when it's provided in the join request.

Reviewers: David Jacot <djacot@confluent.io>
2024-05-14 20:41:20 +02:00
Jeff Kim df5735dda5
MINOR: fix flaky testRecordThreadIdleRatioTwoThreads test (#15937)
Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-05-14 23:20:36 +08:00
Ritika Reddy ccd83cafea
KAFKA-16694; Remove Rack Awareness Code from the Server Side Assignors (#15903)
Reviewers: David Jacot <djacot@confluent.io>
2024-05-14 00:13:35 -07:00
David Jacot f9169b7d3a
KAFKA-16735; Deprecate offsets.commit.required.acks (#15931)
This patch deprecates `offsets.commit.required.acks` in Apache Kafka 3.8 as described in KIP-1041: https://cwiki.apache.org/confluence/x/9YobEg.

Reviewers: Justine Olshan <jolshan@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-05-13 11:30:34 -07:00
Ritika Reddy ee16eee5de
KAFKA-16587: Add subscription model information to group state (#15785)
This patch introduces the SubscriptionType to the group state and passes it along to the partition assignor. A group is "homogeneous" when all the members are subscribed to the same topics; or it is "heterogeneous" otherwise. This mainly helps the uniform assignor because it does not have to re-compute this information to determine which algorithm to use.

trunk:
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionModel)  (topicCount)  Mode  Cnt    Score    Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10          HOMOGENEOUS           100  avgt    5    0.136 ±  0.001  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10          HOMOGENEOUS          1000  avgt    5    0.198 ±  0.002  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10          HOMOGENEOUS           100  avgt    5    1.767 ±  0.138  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10          HOMOGENEOUS          1000  avgt    5    1.540 ±  0.020  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10          HOMOGENEOUS           100  avgt    5   32.419 ±  7.173  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10          HOMOGENEOUS          1000  avgt    5   26.731 ±  1.985  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false            100                         10          HOMOGENEOUS           100  avgt    5    0.242 ±  0.006  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false            100                         10          HOMOGENEOUS          1000  avgt    5    1.002 ±  0.006  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false           1000                         10          HOMOGENEOUS           100  avgt    5    2.544 ±  0.168  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false           1000                         10          HOMOGENEOUS          1000  avgt    5   10.749 ±  0.207  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10          HOMOGENEOUS           100  avgt    5   26.832 ±  0.154  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10          HOMOGENEOUS          1000  avgt    5  106.209 ±  0.301  ms/op
JMH benchmarks done

patch:
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt   Score   Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10         HOMOGENEOUS           100  avgt    5   0.131 ± 0.001  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false            100                         10         HOMOGENEOUS          1000  avgt    5   0.185 ± 0.004  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10         HOMOGENEOUS           100  avgt    5   1.943 ± 0.091  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false           1000                         10         HOMOGENEOUS          1000  avgt    5   1.450 ± 0.139  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10         HOMOGENEOUS           100  avgt    5  30.803 ± 2.644  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10         HOMOGENEOUS          1000  avgt    5  24.251 ± 1.230  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false            100                         10         HOMOGENEOUS           100  avgt    5   0.155 ± 0.004  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false            100                         10         HOMOGENEOUS          1000  avgt    5   0.235 ± 0.010  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false           1000                         10         HOMOGENEOUS           100  avgt    5   1.602 ± 0.046  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false           1000                         10         HOMOGENEOUS          1000  avgt    5   1.901 ± 0.174  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS           100  avgt    5  16.098 ± 1.905  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS          1000  avgt    5  17.681 ± 0.174  ms/op
JMH benchmarks done

Reviewers: David Jacot <djacot@confluent.io>
2024-05-13 02:19:05 -07:00
Jeff Kim 8a9dd2beda
KAFKA-16663; Cancel write timeout TimerTask on successful event completion (#15902)
Write events create and add a TimerTask to schedule the timeout operation. The issue is that we pile up the number of timer tasks which are essentially no-ops if replication was successful. They stay in memory for 15 seconds (default write timeout) and as the rate of write increases, the impact on memory usage increases.

Instead, cancel the corresponding write timeout task when the write event is committed to the log. This also applies to complete transaction events.

Reviewers: David Jacot <djacot@confluent.io>
2024-05-13 00:18:32 -07:00
Jeff Kim 21bf715622
KAFKA-16307; Fix coordinator thread idle ratio (#15835)
This PR fixes the thread idle ratio. We take a similar approach to the kafka request handler idle ratio: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L108-L117

Instead of calculating the actual ratio per thread, we record the time each thread stays idle while waiting for a new event, divided by the number of threads as an approximation.

Reviewers: David Jacot <djacot@confluent.io>
2024-05-07 06:21:09 -07:00
Dongnuo Lyu 459eaec666
KAFKA-16615; JoinGroup API for upgrading ConsumerGroup (#15798)
The patch implements JoinGroup API for the new consumer groups. It allow members using the classic rebalance protocol with the consumer embedded protocol to join a new consumer group.

Reviewers: David Jacot <djacot@confluent.io>
2024-05-06 23:59:10 -07:00
David Jacot 42754336e1
MINOR: Remove `ConsumerGroupPartitionMetadataValue.Epoch` field (#15854)
ConsumerGroupPartitionMetadataValue.Epoch is not used anywhere so we can remove it. Note that we already have non-backward compatible changes lined up for 3.8 so it is fine to do it.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-05-06 05:02:39 -07:00
Chia Chuan Yu 55a00be4e9
MINOR: Replaced Utils.join() with JDK API. (#15823)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-05-06 15:13:01 +08:00
David Jacot 2c0b8b6920
MINOR: ConsumerGroup#getOrMaybeCreateMember should not add the member to the group (#15847)
While reviewing https://github.com/apache/kafka/pull/15785, I noticed that the member is added to the group directly in `ConsumerGroup#getOrMaybeCreateMember`. This does not hurt but confuses people because the state must not be mutated at this point. It should only be mutated when records are replayed. I think that it is better to remove it in order to make it clear.

Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2024-05-03 06:24:26 -07:00
Dongnuo Lyu 1e8415160f
MINOR: Add replayRecords to CoordinatorResult (#15818)
The patch adds a boolean attribute `replayRecords` that specifies whether the records should be replayed.

Reviewers: David Jacot <djacot@confluent.io>
2024-04-30 09:14:02 -07:00
Dongnuo Lyu 994077e43e
MINOR: Fix the flaky testConsumerGroupHeartbeatWithStableClassicGroup by sorting the topic partition list (#15816)
We are seeing flaky test in `testConsumerGroupHeartbeatWithStableClassicGroup` where the error is caused by the different ordering in the expected and actual values. The patch sorts the topic partition list in the records to fix the issue.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Igor Soarez <soarez@apple.com>, David Jacot <djacot@confluent.io>
2024-04-29 00:43:49 -07:00
Dongnuo Lyu dc1d8fc330
KAFKA-16554: Online downgrade triggering and group type conversion (#15721)
Online downgrade from a consumer group to a classic group is triggered when the last consumer that uses the consumer protocol leaves the group. A rebalance is manually triggered after the group conversion. This patch adds consumer group downgrade validation and conversion.

Reviewers: David Jacot <djacot@confluent.io>
2024-04-25 07:44:25 -07:00
Omnia Ibrahim 363f4d2847
KAFKA-15853 Move consumer group and group coordinator configs out of core (#15684)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-04-17 20:41:22 +08:00
Dongnuo Lyu a9f65a5d7f
KAFKA-16436; Online upgrade triggering and group type conversion (#15662)
This patch introduces the conversion from a classic group to a consumer group when a member joins with the new consumer group protocol (epoch is 0) but only if the conversion is enabled.

Reviewers: David Jacot <djacot@confluent.io>
2024-04-17 04:57:44 -07:00
Dongnuo Lyu 619f27015f
KAFKA-16294: Add group protocol migration enabling config (#15411)
This patch adds the `group.consumer.migration.policy` config which controls how consumer groups can be converted from classic group to consumer group and vice versa. The config is kept as an internal one while we develop the feature.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
2024-04-10 10:59:26 -07:00
Dongnuo Lyu 9bc48af1c1
MINOR: Add type check to classic group timeout operations (#15587)
When implementing the group type conversion from a classic group to a consumer group, if the replay of conversion records fails, the group should be reverted back including its timeouts.

A possible solution is to keep all the classic group timeouts and add a type check to the timeout operations. If the group is successfully upgraded, it won't be able to pass the type check and its operations will be executed without actually doing anything; if the group upgrade fails, the group map will be reverted and the timeout operations will be executed as is.

We've already have group type check in consumer group timeout operations. This patch adds similar type check to those classic group timeout operations.

Reviewers: David Jacot <djacot@confluent.io>
2024-04-10 00:36:49 -07:00
Erik van Oosten 8e61f04228
MINOR: Fix usage of none in javadoc (#15674)
- Use `Empty` instead of 'none' when referring to `Optional` values.
- `Headers.lastHeader` returns `null` when no header is found.
- Fix minor spelling mistakes.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-04-08 08:43:05 +08:00
Jeff Kim b3116f4f76
KAFKA-16148: Implement GroupMetadataManager#onUnloaded (#15446)
This patch completes all awaiting futures when a group is unloaded.

Reviewers: David Jacot <djacot@confluent.io>
2024-04-02 03:16:02 -07:00
Sean Quah ad960635a9
KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification (#15559)
KIP-890 Part 1 introduced verification of transactions with the
transaction coordinator on the `Produce` and `TxnOffsetCommit` paths.
This introduced the possibility of new errors when responding to those
requests. For backwards compatibility with older clients, a choice was
made to convert some of the new retriable errors to existing errors that
are expected and retried correctly by older clients.

`NETWORK_EXCEPTION` was forgotten about and not converted, but can occur
if, for example, the transaction coordinator is temporarily refusing
connections. Now, we convert it to:
 * `NOT_ENOUGH_REPLICAS` on the `Produce` path, just like the other
   retriable errors that can arise from transaction verification.
 * `COORDINATOR_LOAD_IN_PROGRESS` on the `TxnOffsetCommit` path. This
   error does not force coordinator lookup on clients, unlike
   `COORDINATOR_NOT_AVAILABLE`. Note that this deviates from KIP-890,
   which says that retriable errors should be converted to
   `COORDINATOR_NOT_AVAILABLE`.

Reviewers: Artem Livshits <alivshits@confluent.io>, David Jacot <djacot@confluent.io>, Justine Olshan <jolshan@confluent.io>
2024-03-25 16:08:23 -07:00
David Jacot be17df6fda
KAFKA-16374; High watermark updates should have a higher priority (#15534)
When the group coordinator is under heavy load, the current mechanism to release pending events based on updated high watermark, which consist in pushing an event at the end of the queue, is bad because pending events pay the cost of the queue twice. A first time for the handling of the first event and a second time for the handling of the hwm update. This patch changes this logic to push the hwm update event to the front of the queue in order to release pending events as soon as as possible.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2024-03-25 01:20:10 -07:00
Sanskar Jhajharia 2e8d69b78c
KAFKA-16314: Introducing the AbortableTransactionException (#15486)
As a part of KIP-890, we are introducing a new class of Exceptions which when encountered shall lead to Aborting the ongoing Transaction. The following PR introduces the same with client side handling and server side changes.

On client Side, the code attempts to handle the exception as an Abortable error and ensure that it doesn't take the producer to a fatal state. For each of the Transactional APIs, we have added the appropriate handling. For the produce request, we have verified that the exception transitions the state to Aborted.
On the server side, we have bumped the ProduceRequest, ProduceResponse, TxnOffestCommitRequest and TxnOffsetCommitResponse Version. The appropriate handling on the server side has been added to ensure that the new error case is sent back only for the new clients. The older clients will continue to get the old Invalid_txn_state exception to maintain backward compatibility.

Reviewers: Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
2024-03-22 11:26:07 -07:00
Dongnuo Lyu 3e3c618bdc
KAFKA-16313: Offline group protocol migration (#15546)
This patch enables an empty classic group to be automatically converted to a new consumer group and vice versa.

Reviewers: David Jacot <djacot@confluent.io>
2024-03-20 00:49:11 -07:00
David Jacot c66d66dc67
KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received (#15533)
This patch fixes a bug in the logic which decides when a full ConsumerGroupHeartbeat response must be returned to the client. Prior to it, the logic only relies on the `ownedTopicPartitions` field to check whether the response was a full response. This is not enough because `ownedTopicPartitions` is also set in different situations. This patch changes the logic to check `ownedTopicPartitions`, `subscribedTopicNames` and `rebalanceTimeoutMs` as they are the only three non optional fields.

Reviewers: Lianet Magrans <lianetmr@gmail.com>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2024-03-19 13:48:41 -07:00
David Jacot 3599823288
MINOR: Remove unused client side assignor fields/classes (#15545)
In https://github.com/apache/kafka/pull/15364, we introduced, thoughtfully, a non-backward compatible record change for the new consumer group protocol. So it is a good opportunity for cleaning unused fields, mainly related to the client side assignor logic which is not implemented yet. It is better to introduce them when we need them and more importantly when we implement it.

Note that starting from 3.8, we won't make such changes anymore. Non-backward compatible changes are still acceptable now because we clearly said that upgrade won't be supported from the KIP-848 EA.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-03-18 00:52:08 -07:00
Dongnuo Lyu fa190cf18e
MINOR: Only enable replay methods to modify timeline data structure (#15528)
The patch prevents the main method (the method generating records) from modifying the timeline data structure `groups`  by calling `getOrMaybeCreateConsumerGroup` in kip-848 new group coordinator. Only replay methods are able to add the newly created group to `groups`.

Reviewers: David Jacot <djacot@confluent.io>
2024-03-15 05:24:59 -07:00
David Jacot e164d4d426
KAFKA-16249; Improve reconciliation state machine (#15364)
This patch re-work the reconciliation state machine on the server side with the goal to fix a few issues that we have recently discovered.
* When a member acknowledges the revocation of partitions (by not reporting them in the heartbeat), the current implementation may miss it. The issue is that the current implementation re-compute the assignment of a member whenever there is a new target assignment installed. When it happens, it does not consider the reported owned partitions at all. As the member is supposed to only report its own partitions when they change, the member is stuck.
* Similarly, as the current assignment is re-computed whenever there is a new target assignment, the rebalance timeout, as it is currently implemented, becomes useless. The issue is that the rebalance timeout is reset whenever the member enters the revocation state. In other words, in the current implementation, the timer is reset when there are no target available even if the previous revocation is not completed yet.

The patch fixes these two issues by not automatically recomputing the assignment of a member when a new target assignment is available. When the member must revoke partitions, the coordinator waits. Otherwise, it recomputes the next assignment. In other words, revoking is really blocking now.

The patch also proposes to include an explicit state in the record. It makes the implementation cleaner and it also makes it more extensible in the future.

The patch also changes the record format. This is a non-backward compatible change. I think that we should do this change to cleanup the record. As KIP-848 is only in early access in 3.7 and that we clearly state that we don't plane to support upgrade from it, this is acceptable in my opinion.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2024-03-14 00:54:28 -07:00
David Jacot f5c4d522fd
MINOR: Add read/write all operation (#15462)
There are a few cases in the group coordinator service where we want to read from or write to each of the known coordinators (each of __consumer_offsets partitions). The current implementation needs to get the list of the known coordinators then schedules the operation and finally aggregate the results. This patch is an attempt to streamline this by adding multi read/write to the runtime.

Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2024-03-07 07:51:04 -08:00
Dongnuo Lyu 2f401ff4c8
MINOR: parameterize group-id in GroupMetadataManagerTestContext (#15467)
This pr parameterize some group ids in GroupMetadataManagerTestContext that are now constant strings.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2024-03-05 22:50:58 +08:00
David Jacot d066b94c81
MINOR: Fix UpdatedImage and HighWatermarkUpdated events' logs (#15432)
I have noticed the following log when a __consumer_offsets partition immigrate from a broker. It appends because the event is queued up after the event that unloads the state machine. This patch fixes it and fixes another similar one.

```
[2024-02-06 17:14:51,359] ERROR [GroupCoordinator id=1] Execution of UpdateImage(tp=__consumer_offsets-28, offset=13251) failed due to This is not the correct coordinator.. (org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime)
org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.
```

Reviewers: Justine Olshan <jolshan@confluent.io>
2024-02-29 07:01:21 -08:00
David Jacot 52289c92be
MINOR: Optimize EventAccumulator (#15430)
`poll(long timeout, TimeUnit unit)` is either used with `Long.MAX_VALUE` or `0`. This patch replaces it with `poll` and `take`. It removes the `awaitNanos` usage.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2024-02-28 05:38:02 -08:00
Jeff Kim 0979327520
KAFKA-16306: fix GroupCoordinatorService logger (#15433)
This patch corrects the logger used for GroupCoordinatorService.

Reviewers: Anton Liauchuk <anton93lev@gmail.com>, David Jacot <djacot@confluent.io>
2024-02-27 05:45:55 -08:00
David Jacot 5edf52359a
MINOR: Fix group metadata loading log (#15368)
Spotted the following log: 
```
[2024-02-14 09:59:30,103] INFO [GroupCoordinator id=1] Finished loading of metadata from 39 in __consumer_offsets-4ms with epoch 2 where 39ms was spent in the scheduler. Loaded 0 records which total to 0 bytes. (org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime)
```
The partition and the time are incorrect. This patch fixes it.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Bruno Cadonna <bruno@confluent.io>
2024-02-15 00:19:43 -08:00
David Jacot d24abe0ede
MINOR: Refactor GroupMetadataManagerTest (#15348)
`GroupMetadataManagerTest` class got a little under control. We have too many things defined in it. As a first steps, this patch extracts all the inner classes. It also extracts all the helper methods. However, the logic is not changed at all.

Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Justine Olshan <jolshan@confluent.io>
2024-02-13 23:29:29 -08:00
David Jacot 9865d54c42
MINOR: EventAccumulator should signal one thread when key becomes available (#15340)
`signalAll` was mistakenly used instead of `signal` when a key become available in the `EventAccumulator`. The fix relies on existing tests.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2024-02-09 04:29:04 -08:00
Ritika Reddy 68745ef21a
KAFKA-15460: Add group type filter to List Groups API (#15152)
This patch adds the support for filtering groups by types (Classic or Consumer) to both the old and the new group coordinators.

Reviewers: David Jacot <djacot@confluent.io>
2024-02-05 00:56:39 -08:00
David Jacot af41fc3614
KAFKA-16168; Implement GroupCoordinator.onPartitionsDeleted (#15237)
This patch implements `GroupCoordinator.onPartitionsDeleted` that is called whenever a partition is deleted and must deleted all the offsets related to them. The patch uses a naive approach similar to the one used in the old coordinator. It basically iterates over all the regular end pending offsets and deletes the ones matching the deleted partition set.

Reviewers: Justine Olshan <jolshan@confluent.io>
2024-02-01 00:28:32 -08:00
David Jacot 0472db2cd3
MINOR: Uniformize error handling/transformation in GroupCoordinatorService (#15196)
This patch uniformizes the error handling in the GroupCoordinatorService with the aim to reuse the same error translation for all operations. It also ensures that exceptions are unwrapped if needed.

Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2024-01-30 23:23:58 -08:00
DL1231 82920ffad0
KAFKA-16095: Update list group state type filter to include the states for the new consumer group type (#15211)
While using —list —state the current accepted values correspond to the classic group type states. This patch adds the new states introduced by KIP-848. It also make the matching on the server case insensitive.

Co-authored-by: d00791190 <dinglan6@huawei.com>

Reviewers: Ritika Reddy <rreddy@confluent.io>, David Jacot <djacot@confluent.io>
2024-01-29 07:19:05 -08:00
David Jacot e7fa0edd63
KAFKA-14505; [8/8] Update offset delete paths (#15221)
This is the last patch to complete the implementation of the transactional offsets. This patch updates the following paths:
* delete offsets - the patch ensures that a tombstone is written for pending transactional offsets too.
* delete all offsets - the patch ensures that all pending transactional offsets are deleted too.
* expire offsets - the patch ensures that an offset for a partition is not expire is there is a pending transaction.
* replay offset record - the patch ensures that all pending transactional offsets are removed when a tombstone is received.

Reviewers: Ritika Reddy <rreddy@confluent.io>, Dongnuo Lyu <dlyu@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2024-01-26 05:16:22 -08:00
Dongnuo Lyu c6194bbb0a
MINOR: populate TopicName in ConsumerGroupDescribe (#15205)
The patch populates the topic name of `ConsumerGroupDescribeResponseData.TopicPartitions` with the corresponding topic id in `ConsumerGroupDescribe`.

Reviewers: David Jacot <djacot@confluent.io>
2024-01-25 05:16:33 -08:00
Apoorv Mittal 208f9e7765
KAFKA-15813: Evict client instances from cache (KIP-714) (#15234)
KIP-714 requires client instance cache in broker which should also have a time-based eviction policy where client instances which are not actively sending metrics should be evicted. KIP mentions This client instance specific state is maintained in broker memory up to MAX(60*1000, PushIntervalMs * 3) milliseconds.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>
2024-01-23 15:06:02 -08:00
David Jacot 4d6a422e86
KAFKA-14505; [7/N] Always materialize the most recent committed offset (#15183)
When transactional offset commits are eventually committed, we must always keep the most recent committed when we have a mix of transactional and regular offset commits. We achieve this by storing the offset of the offset commit record along side the committed offset in memory. Without preserving information of the commit record offset, compaction of the __consumer_offsets topic itself may result in the wrong offset commit being materialized.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2024-01-22 23:26:40 -08:00
Omnia Ibrahim 62ce551826
KAFKA-15853: Move KafkaConfig.Defaults to server module (#15158)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ismael Juma <ismael@juma.me.uk>
, David Jacot <djacot@confluent.io>, Nikolay <NIzhikov@gmail.com>
2024-01-22 15:29:11 +01:00
David Jacot cf90382fb9
KAFKA-16147; Partition is assigned to two members at the same time (#15212)
We had a case where a partition got assigned to two members and we found a bug in the partition epochs bookkeeping. Basically, when a member has a partition pending revocation re-assigned to him before the revocation is completed, the partition epoch is lost. Here is an example of such transition:

```
[2024-01-16 12:10:52,613] INFO [GroupCoordinator id=1 topic=__consumer_offsets partition=7] [GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member M2 transitioned from CurrentAssignment(memberEpoch=11, previousMemberEpoch=9, targetMemberEpoch=14, state=revoking, assignedPartitions={}, partitionsPendingRevocation={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 3, 4, 5, 6, 7]}, partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[0, 5]}) to CurrentAssignment(memberEpoch=15, previousMemberEpoch=11, targetMemberEpoch=15, state=stable, assignedPartitions={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 3, 4, 5, 6, 7]}, partitionsPendingRevocation={}, partitionsPendingAssignment={}). (org.apache.kafka.coordinator.group.GroupMetadataManager)
```

This patch fixes the bug and also strengthen the partition epochs bookkeeping to not accept such invalid transitions.

Reviewers: Justine Olshan <jolshan@confluent.io>
2024-01-22 01:16:20 -08:00
Jeff Kim 96f852f9e7
MINOR: log new coordinator partition load schedule time (#15017)
The current load summary exposes the time from when the partition load operation is scheduled to when the load completes. We are missing the information of how long the scheduled operation stays in the scheduler. Log that information.

Reviewers: David Jacot <djacot@confluent.io>
2024-01-18 02:28:17 -08:00
David Arthur 7bf7fd99a5
KAFKA-16078: Be more consistent about getting the latest MetadataVersion
This PR creates MetadataVersion.latestTesting to represent the highest metadata version (which may be unstable) and MetadataVersion.latestProduction to represent the latest version that should be used in production. It fixes a few cases where the broker was advertising that it supported the testing versions even when unstable metadata versions had not been configured.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
2024-01-17 14:59:22 -08:00
David Jacot 3a6e699f13
KAFKA-16118; Coordinator unloading fails when replica is deleted (#15182)
When a replica is deleted, the unloading procedure of the coordinator is called with an empty leader epoch. However, the current implementation of the new group coordinator throws an exception in this case. My bad. This patch updates the logic to handle it correctly.

We discovered the bug in our testing environment. We will add a system test or an integration test in a subsequent patch to better exercise this path.

Reviewers: Justine Olshan <jolshan@confluent.io>
2024-01-12 15:34:52 -08:00
David Jacot 6b9cb5ccbf
KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support (#15155)
This patch adds `UNSTABLE_OFFSET_COMMIT` errors support in the new group coordinator. `UNSTABLE_OFFSET_COMMIT` errors for partitions with unstable offset commits. Here unstable means that there are ongoing transactions.

Reviewers: Justine Olshan <jolshan@confluent.io>
2024-01-12 05:33:39 -08:00
Divij Vaidya 65424ab484
MINOR: New year code cleanup - include final keyword (#15072)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Sagar Rao <sagarmeansocean@gmail.com>
2024-01-11 17:53:35 +01:00
David Jacot 6ff21ee1e0
MINOR: Disalow using a group id with only whitespaces in the new consumer group protocol (#15173)
This patch strengthen the validation of the group id when the new consumer group protocol is used.

Reviewers: Divij Vaidya <diviv@amazon.com>
2024-01-11 07:04:18 -08:00
David Jacot a8203f9c7a
KAFKA-14505; [4/N] Wire transaction verification (#15142)
This patch wires the transaction verification in the new group coordinator. It basically calls the verification path before scheduling the write operation. If the verification fails, the error is returned to the caller.

Note that the patch uses `appendForGroup`. I suppose that we will move away from using it when https://github.com/apache/kafka/pull/15087 is merged.

Reviewers: Justine Olshan <jolshan@confluent.io>
2024-01-11 04:58:57 -08:00
Omnia Ibrahim dba789dc93
KAFKA-15853: Move OffsetConfig to group-coordinator module (#15161)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, David Jacot <djacot@confluent.io>, Nikolay <nizhikov@apache.org>
2024-01-11 10:19:42 +01:00
Jeff Kim cd3b3d9804
MINOR: fix custom retry backoff in new group coordinator (#15170)
When a retryable write operation fails, we retry with the default 500ms backoff. If a custom retry backoff was used to originally schedule the operation, we should retry with the same custom backoff instead of the default.

Reviewers: David Jacot <djacot@confluent.io>
2024-01-11 00:28:32 -08:00
Jeff Kim ac7ddc7d46
MINOR: Remove classic group preparing rebalance sensor (#15143)
Remove "group-rebalance-rate" and "group-rebalance-count" metrics from the new coordinator as this is not part of KIP-848.

Reviewers: David Jacot <djacot@confluent.io>
2024-01-09 01:10:21 -08:00
vamossagar12 d5aa341a18
MINOR: Fix flaky test GroupMetadataManagerTest.testStaticMemberGetsBackAssignmentUponRejoin (#15100)
Reviewers: Divij Vaidya <diviv@amazon.com>

---------

Co-authored-by: Sagar Rao <sagarrao@Sagars-MacBook-Pro.local>
2023-12-31 12:47:14 +01:00
David Jacot 98aca56ee5
KAFKA-16040; Rename `Generic` to `Classic` (#15059)
People has raised concerned about using `Generic` as a name to designate the old rebalance protocol. We considered using `Legacy` but discarded it because there are still applications, such as Connect, using the old protocol. We settled on using `Classic` for the `Classic Rebalance Protocol`.

The changes in this patch are extremely mechanical. It basically replaces the occurrences of `generic` by `classic`.

Reviewers: Divij Vaidya <diviv@amazon.com>, Lucas Brutschy <lbrutschy@confluent.io>
2023-12-21 13:39:17 -08:00
David Jacot 79757b3081
KAFKA-14505; [3/N] Wire WriteTxnMarkers API (#14985)
This patch wires the handling of makers written by the transaction coordinator via the WriteTxnMarkers API. In the old group coordinator, the markers are written to the logs and the group coordinator is informed to materialize the changes as a second step if the writes were successful. This approach does not really work with the new group coordinator for mainly two reasons: 1) The second step would actually fail while the coordinator is loading and there is no guarantee that the loading has picked up the write or not; 2) It does not fit well with the new memory model where the state is snapshotted by offset. In both cases, it seems that having a single writer to the `__consumer_offsets` partitions is more robust and preferable.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-12-21 10:59:41 -08:00
Jeff Kim 4613286076
KAFKA-16030: new group coordinator should check if partition goes offline during load (#15043)
The new coordinator stops loading if the partition goes offline during load. However, the partition is still considered active. Instead, we should return NOT_LEADER_OR_FOLLOWER exception during load.

Another change is that we only want to invoke CoordinatorPlayback#updateLastCommittedOffset if the current offset (last written offset) is greater than or equal to the current high watermark. This is to ensure that in the case the high watermark is ahead of the current offset, we don't clear snapshots prematurely.

Reviewers: David Jacot <djacot@confluent.io>
2023-12-21 06:17:35 -08:00
Jeff Kim f3038d5e73
KAFKA-15870: Move new group coordinator metrics from Yammer to Metrics (#14848)
This patch moves all the newly introduced metrics to the Kafka Metrics.

Reviewers: David Jacot <djacot@confluent.io>
2023-12-19 23:37:40 -08:00
Jeff Kim 3bd8ec16f6
MINOR: Transform new coordinator error before returning to client (#15001)
This was missing from https://issues.apache.org/jira/browse/KAFKA-14500. The existing coordinator transforms the log append error before returning to client. Apply the same transformation.

Reviewers: David Jacot <djacot@confluent.io>
2023-12-15 06:33:25 -08:00
vamossagar12 a1e985d22f
KAFKA-15237: Implement write operation timeout (#14981)
This patch ensure that `offset.commit.timeout.ms` is enforced. It does so by adding a timeout to the CoordinatorWriteEvent.

Reviewers: David Jacot <djacot@confluent.io>
2023-12-13 11:30:53 -08:00
Jeff Kim 0d9ee03742
KAFKA-15981: update Group size only when groups size changes (#14988)
Currently, we increment generic group metrics whenever we create a new Group object when we load a partition. This is incorrect as the partition may contain several records for the same group if in the active segment or if the segment has not yet been compacted.

The same applies to removing groups; we can possibly have multiple group tombstone records. Instead, only increment the metric if we created a new group and only decrement the metric if the group exists.

Reviewers: David Jacot <djacot@confluent.io>
2023-12-13 00:01:56 -08:00
David Jacot 131581a2b4
MINOR: Remove `SubscribedTopicRegex` field from `ConsumerGroupHeartbeatRequest` (#14956)
The support for regular expressions has not been implemented yet in the new consumer group protocol. This patch removes the `SubscribedTopicRegex` from the `ConsumerGroupHeartbeatRequest` in preparation for 3.7. It seems better to bump the version and add it back when we implement the feature, as part of https://issues.apache.org/jira/browse/KAFKA-14517, instead of having an unused field in the request.

Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, Justine Olshan <jolshan@confluent.io>
2023-12-10 23:53:08 -08:00
David Jacot 522c2864cd
KAFKA-14505; [2/N] Implement TxnOffsetCommit API (#14845)
This patch implements the TxnOffsetCommit API. When a transactional offset commit is received, it is stored in the pending transactional offsets structure and waits there until the transaction is committed or aborted. Note that the handling of the transaction completion is not implemented in this patch.

Reviewers: Justine Olshan <jolshan@confluent.io>
2023-12-07 02:51:22 -08:00
Jeff Kim b888fa1ec9
KAFKA-15910: New group coordinator needs to generate snapshots while loading (#14849)
After the new coordinator loads a __consumer_offsets partition, it logs the following exception when making a read operation (fetch/list groups, etc):

 ```
java.lang.RuntimeException: No in-memory snapshot for epoch 740745. Snapshot epochs are:
at org.apache.kafka.timeline.SnapshotRegistry.getSnapshot(SnapshotRegistry.java:178)
at org.apache.kafka.timeline.SnapshottableHashTable.snapshottableIterator(SnapshottableHashTable.java:407)
at org.apache.kafka.timeline.TimelineHashMap$ValueIterator.<init>(TimelineHashMap.java:283)
at org.apache.kafka.timeline.TimelineHashMap$Values.iterator(TimelineHashMap.java:271)
```
 
This happens because we don't have a snapshot at the last updated high watermark after loading. We cannot generate a snapshot at the high watermark after loading all batches because it may contain records that have not yet been committed. We also don't know where the high watermark will advance up to so we need to generate a snapshot for each offset the loader observes to be greater than the current high watermark. Then once we add the high watermark listener and update the high watermark we can delete all of the older snapshots. 

Reviewers: David Jacot <djacot@confluent.io>
2023-12-06 08:38:05 -08:00
David Jacot 34e1dbbaba
MINOR: Add Uniform assignor to the default config (#14826)
This patch adds the `Uniform` assignor to the default list of supported assignors. It also do small changes in the code.

Reviewers: Justine Olshan <jolshan@confluent.io>
2023-12-05 00:32:50 -08:00
Jeff Kim 8038bc9342
KAFKA-14987 [2/2]; customize retry backoff for group/offsets expiration (#14870)
The group expiration log becomes noisy when we encounter a retry-able error as the retry backoff is fixed to 500 ms. Allow customizable retry backoff so that even in the case of failure we have a longer delay. The current default for offsetsRetentionCheckIntervalMs is set to 10 minutes so even if the operation fails we will "retry" after 10 minutes. 

Reviewers: David Jacot <djacot@confluent.io>
2023-12-05 00:18:56 -08:00
Max Riedel b7c99e22a7
KAFKA-14509: [2/N] Implement server side logic for ConsumerGroupDescribe API (#14544)
This patch implements the ConsumerGroupDescribe API.

Reviewers: David Jacot <djacot@confluent.io>
2023-12-04 07:19:28 -08:00
David Jacot 5ae0b49839
KAFKA-14505; [1/N] Add support for transactional writes to CoordinatorRuntime (#14844)
This patch adds support for transactional writes to the CoordinatorRuntime framework. This mainly consists in adding CoordinatorRuntime#scheduleTransactionalWriteOperation and in adding the producerId and producerEpoch to various interfaces. The patch also extends the CoordinatorLoaderImpl and the CoordinatorPartitionWriter accordingly.

Reviewers: Justine Olshan <jolshan@confluent.io>
2023-11-29 08:54:23 -08:00
vamossagar12 bb1c4465c9
KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats (#14432)
This patch add the support for static membership to the new consumer group protocol. With a static member can join, re-join, temporarily leave and leave. When a member leaves with the expectation to rejoin, it must rejoin within the session timeout. It is kicks out from the consumer group otherwise.

Reviewers: David Jacot <djacot@confluent.io>
2023-11-28 10:08:16 -08:00
Dongnuo Lyu 891dd2a58a
KAFKA-15756: [1/2] Migrate existing integration tests to run old protocol in new coordinator (#14781)
This patch updates the testing framework to support running tests with kraft and the new group coordinator introduced in the context of KIP-848. This can be done by using `kraft+kip-848` as a quorum. Note that this is temporary until we make it the default and only option in 4.0. To verify this, this patch also enables kraft and kraft+kip-848 in PlaintextConsumerTest and its parent classes.

Reviewers: David Jacot <djacot@confluent.io>
2023-11-23 02:05:54 -08:00
Ritika Reddy 55017a4f68
KAFKA-15484: General Rack Aware Assignor (#14481)
This patch adds the second part of the Uniform Assignor, used when the subscriptions of each member in a consumer group are different.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
2023-11-23 01:18:50 -08:00
Jeff Kim 14065a7fdc
MINOR: fix MultiThreadedEventProcessorTest.testMetrics() (#14802)
Reviewers: David Jacot <djacot@confluent.io>
2023-11-21 00:16:32 -08:00
Jeff Kim 07fee62afe
KAFKA-14519; [2/N] New coordinator metrics (#14387)
This patch copy over existing metrics and add new consumer group metrics to the new GroupCoordinatorService.

Now that each coordinator is responsible for a topic partition, this patch introduces a GroupCoordinatorMetrics that records gauges for global metrics such as the number of generic groups in PreparingRebalance state, etc. For GroupCoordinatorShard specific metrics, GroupCoordinatorMetrics will activate new GroupCoordinatorMetricsShards that will be responsible for incrementing/decrementing TimelineLong objects and then aggregate the total amount across all shards.

As the CoordinatorRuntime/CoordinatorShard does not care about group metadata, we have introduced a CoordinatorMetrics.java/CoordinatorMetricsShard.java so that in the future transaction coordinator metrics can also be onboarded in a similar fashion.

Main files to look at:

GroupCoordinatorMetrics.java
GroupCoordinatorMetricsShard.java
CoordinatorMetrics.java
CoordinatorMetricsShard.java
CoordinatorRuntime.java
Metrics to add after #14408 is merged:

offset deletions sensor (OffsetDeletions); Meter(offset-deletion-rate, offset-deletion-count)
Metrics to add after https://issues.apache.org/jira/browse/KAFKA-14987 is merged:

offset expired sensor (OffsetExpired); Meter(offset-expiration-rate, offset-expiration-count)

Reviewers: Justine Olshan <jolshan@confluent.io>
2023-11-20 21:38:50 -08:00
Dongnuo Lyu b1796ce6d2
KAFKA-15849: Fix ListGroups API when runtime partition size is zero (#14785)
When the group coordinator does not host any __consumer_offsets partitions, the existing ListGroup implementation won't schedule any operation, thus a `new CompletableFuture<>()` is returned directly and never gets completed. This patch fixes the issue.

Reviewers: David Jacot <djacot@confluent.io>
2023-11-17 04:48:02 -08:00
Jay Wang a64037cdef
MINOR: Fix GroupCoordinatorShardTest stubbing (#14637)
This patch fixes incorrect stubs in GroupCoordinatorShardTest.

Reviewers: David Jacot <djacot@confluent.io>
2023-11-14 23:45:11 -08:00
Ritika Reddy 1e7e9ce918
KAFKA 14515: Optimized Uniform Rack Aware Assignor (#14416)
This patch adds the Optimized Uniform Rack Aware Assignor.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
2023-11-05 22:33:48 -08:00
Dongnuo Lyu 7bdd1a015e
KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator (#14589)
In `KafkaApis.scala`, we build the API response differently if exceptions are thrown during the API execution. Since the new group coordinator only populates the response with error code instead of throwing an exception when an error occurs, there may be different behavior between the existing group coordinator and the new one.

This patch:
- Fixes the response building in `KafkaApis.scala` for the two APIs affected by such difference -- OffsetFetch and OffsetDelete.
- In `GroupCoordinatorService.java`, returns a response with error code instead of a failed future when the coordinator is not active.

Reviewers: David Jacot <djacot@confluent.io>
2023-10-31 03:11:52 -07:00
Ritika Reddy a48ca490e4
KAFKA-15643: Fix error logged when unload is called on a broker that was never a coordinator. (#14657)
When a new leader is elected for a __consumer_offset partition, the followers are notified to unload the state. However, only the former leader is aware of it. The remaining follower prints out the following error:
`ERROR [GroupCoordinator id=1] Execution of UnloadCoordinator(tp=__consumer_offsets-1, epoch=0) failed due to This is not the correct coordinator.. (org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime)`
The error is actually correct and expected when in the remaining follower case, however this could be misleading. This patch handles the case gracefully.

Reviewers: David Jacot <djacot@confluent.io>
2023-10-31 03:09:32 -07:00
Jeff Kim abee8f711c
KAFKA-14519; [1/N] Implement coordinator runtime metrics (#14417)
Implements the following metrics:

kafka.server:type=group-coordinator-metrics,name=num-partitions,state=loading
kafka.server:type=group-coordinator-metrics,name=num-partitions,state=active
kafka.server:type=group-coordinator-metrics,name=num-partitions,state=failed
kafka.server:type=group-coordinator-metrics,name=event-queue-size
kafka.server:type=group-coordinator-metrics,name=partition-load-time-max
kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg
kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-min
kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-avg
The PR makes these metrics generic so that in the future the transaction coordinator runtime can implement the same metrics in a similar fashion.

Also, CoordinatorLoaderImpl#load will now return LoadSummary which encapsulates the start time, end time, number of records/bytes.

Co-authored-by: David Jacot <djacot@confluent.io>

Reviewers:  Ritika Reddy <rreddy@confluent.io>, Calvin Liu <caliu@confluent.io>, David Jacot <djacot@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-10-17 16:06:23 -07:00
Jeff Kim 7b5d640cc6
KAFKA-14987; Implement Group/Offset expiration in the new coordinator (#14467)
This patch implements the groups and offsets expiration in the new group coordinator.

Reviewers: Ritika Reddy <rreddy@confluent.io>, David Jacot <djacot@confluent.io>
2023-10-11 23:45:13 -07:00
Dongnuo Lyu 4a6e6c7d8c
KAFKA-14504: Implement DescribeGroups API (#14462)
This patch implements DescribeGroups API in the new group coordinator.

Reviewers: David Jacot <djacot@confluent.io>
2023-10-06 02:25:17 -07:00
Kirk True 59e59fc545
MINOR: Add LEAVE_GROUP_EPOCH to GroupMetadataManager (#14463)
Replacing the use a hardcoded -1 with a constant (`LEAVE_GROUP_EPOCH`) that provides more clarity. Since static members also have a magic number (-2), this will motivate future commits to use constants instead of hardcoded values.

Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, David Jacot <djacot@confluent.io>
2023-10-04 03:09:16 -07:00
Dongnuo Lyu a12f9f97c9
KAFKA-14506: Implement DeleteGroups API and OffsetDelete API (#14408)
This patch implements DeleteGroups and OffsetDelete API in the new group coordinator.

Reviewers: yangy0000, Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
2023-10-04 02:30:45 -07:00
vamossagar12 f5a7491da6
MINOR: Correcting Javadocs for throwIfMemberEpochIsInvalid (#14468)
The Javadoc for `GroupMetadataManager#throwIfMemberEpochIsInvalid` suggests that it throws a `NotCoordinatorException` exception when the member epoch in the consumer heartbeat request is greater than the one known by this coordinator for the given member. This could happen if the heartbeat-ing member got a higher epoch from another coordinator. However the method throws `FencedMemberEpochException` even in this case. This PR corrects the Javadocs to reflect the same.

Reviewers: David Jacot <djacot@confluent.io>
2023-10-02 05:01:01 -07:00
David Jacot 6acf69d7a2
MINOR: Remove the client side assignor from the ConsumerGroupHeartbeat API (#14469)
As a first step, we plan to release a preview of the new consumer group rebalance protocol without the client side assignor. This patch removes all the related fields from the ConsumerGroupHeartbeat API for now. Removing fields is fine here because this API is not released yet and not exposed by default. We will add them back while bumping the version of the request when we release this part in the future.

Reviewers: Justine Olshan <jolshan@confluent.io>
2023-10-02 04:59:20 -07:00
vamossagar12 62afd3baca
MINOR: Fixing typo with CoordinatorRuntime Javadoc (#14461)
Reviewers: David Jacot <djacot@confluent.io>
2023-09-28 09:16:34 -07:00
zhaohaidao f309299f3c
KAFKA-14503: Implement ListGroups (#14271)
This patch implements the ListGroups API in the new group coordinator.

Reviewers: David Jacot <djacot@confluent.io>
2023-09-14 23:45:03 -07:00
Jeff Kim e9057aab37
KAFKA-14502; Implement LeaveGroup protocol in new GroupCoordinator (#14147)
This patch implements the LeaveGroup API in the new group coordinator.

Reviewers: David Jacot <djacot@confluent.io>
2023-09-13 01:43:37 -07:00
David Jacot 7054625c45
KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest (#14321)
This patch adds the MemberId and the MemberEpoch fields to the OffsetFetchRequest. Those fields will be populated when the new consumer group protocol is used to ensure that the member fetching the offset has the correct member id and epoch. If it does not, UNKNOWN_MEMBER_ID or STALE_MEMBER_EPOCH are returned to the client.

Our initial idea was to implement the same for the old protocol. The field is called GenerationIdOrMemberEpoch in KIP-848 to materialize this. As a second though, I think that we should only do it for the new protocol. The effort to implement it in the old protocol is not worth it in my opinion.

Reviewers: Ritika Reddy <rreddy@confluent.io>, Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-09-05 23:36:38 -07:00
Jeff Kim 6391c66035
KAFKA-14500; [7/7] Refactor GroupMetadataManagerTest (#14122)
This patch makes the styling consistent inside GroupMetadataManagerTest. Also, it adds JoinResult to simplify the JoinGroup API responses in the tests.

Reviewers: David Arthur <mumrah@gmail.com>, David Jacot <djacot@confluent.io>
2023-09-01 06:36:33 -07:00
David Jacot dcff0878c4
KAFKA-14499: [5/N] Refactor GroupCoordinator.fetchOffsets and GroupCoordinator.fetchAllOffsets (#14310)
This patch refactors the GroupCoordinator.fetchOffsets and GroupCoordinator.fetchAllOffsets methods to take an OffsetFetchRequestGroup and to return an OffsetFetchResponseGroup. It prepares the ground for adding the member id and the member epoch to the OffsetFetchRequest. This change also makes those two methods more aligned with the others in the interface.

Reviewers: Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-09-01 03:45:24 -07:00
David Jacot 68b7031dc4
KAFKA-14499: [4/N] Implement OffsetFetch API (#14120)
This patch implements the OffsetFetch API in the new group coordinator.

I found out that implementing the `RequireStable` flag is hard (to not say impossible) in the current model. For the context, the flag is here to ensure that an OffsetRequest request does not return stale offsets if there are pending offsets to be committed. In the scala code, we basically check the pending offsets data structure and if they are any pending offsets, we return the `UNSTABLE_OFFSET_COMMIT` error. This tells the consumer to retry.

In our new model, we don't have the pending offsets data structure. Instead, we use a timeline data structure to handle all the pending/uncommitted changes. Because of this we don't know whether offsets are pending for a particular group. Instead of doing this, I propose to not return the `UNSTABLE_OFFSET_COMMIT` error anymore. Instead, when `RequireStable` is set, we use a write operation to ensure that we read the latest offsets. If they are uncommitted offsets, the write operation ensures that the response is only return when they are committed. This gives a similar behaviour in the end.

Reviewers: Justine Olshan <jolshan@confluent.io>
2023-08-28 07:02:56 -07:00
David Jacot 1574b9f16d
MINOR: Code cleanups in group-coordinator module (#14117)
This patch does a few code cleanups in the group-coordinator module.

It renames Coordinator to CoordinatorShard;
It renames ReplicatedGroupCoordinator to GroupCoordinatorShard. I was never really happy with this name. The new name makes more sense to me;
It removes TopicPartition from the GroupMetadataManager. It was only used in log messages. The log context already includes it so we don't have to log it again.
It renames assignors to consumerGroupAssignors.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-07-28 11:28:54 -07:00
Ritika Reddy 3709901c9e
KAFKA-14702: Extend server side assignor to support rack aware replica placement (#14099)
This patch updates the `PartitionAssignor` interface to support rack-awareness. The change introduces the `SubscribedTopicDescriber` interface that can be used to retrieve topic metadata such as the number of partitions or the racks from within an assignor. We use an interface because it allows us to wrap internal data structures instead of having to copy them. It is more efficient.

Reviewers: David Jacot <djacot@confluent.io>
2023-07-28 19:30:04 +02:00
Jeff Kim 19f9e1e6d0
KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator (#14056)
This patch implements the existing Heartbeat API in the new Group Coordinator.

Reviewers: David Jacot <djacot@confluent.io>
2023-07-28 15:13:27 +02:00
David Jacot dcabc295ec
KAFKA-14048; CoordinatorContext should be protected by a lock (#14090)
Accessing the `CoordinatorContext` in the `CoordinatorRuntime` should be protected by a lock. The runtime guarantees that the context is never access concurrently however it is accessed by multiple threads. The lock is here to ensure that we have a proper memory barrier. The patch does the following:
1) Adds a lock to `CoordinatorContext`;
2) Adds helper methods to get the context and acquire/release the lock.
3) Allow transition from Failed to Loading. Previously, the context was recreated in this case.

Reviewers: Justine Olshan <jolshan@confluent.io>
2023-07-28 14:49:48 +02:00
David Jacot 29825ee24f
KAFKA-14499: [3/N] Implement OffsetCommit API (#14067)
This patch introduces the `OffsetMetadataManager` and implements the `OffsetCommit` API for both the old rebalance protocol and the new rebalance protocol. It introduces version 9 of the API but keeps it as unstable for now. The patch adds unit tests to test the API. Integration tests will be done separately.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-07-27 13:18:10 +02:00
Jeff Kim d2fc907623
KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator (#14017)
This patch implements the SyncGroup API in the new group coordinator. All the new unit tests are based on the existing scala tests.

Reviewers: David Jacot <djacot@confluent.io>
2023-07-27 08:02:29 +02:00
David Jacot 2528dd4116
KAFKA-14499: [2/N] Add OffsetCommit record & related (#14047)
This patch does a few things:
1) It introduces the `OffsetAndMetadata` class which hold the committed offsets in the group coordinator.
2) It adds methods to deal with OffsetCommit records to `RecordHelpers`.
3) It adds `MetadataVersion#offsetCommitValueVersion` to get the version of the OffsetCommit value record that should be used.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Arthur <mumrah@gmail.com>, Justine Olshan <jolshan@confluent.io>
2023-07-21 20:09:06 +02:00
Jeff Kim a500c3ecf9
KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator (#13870)
This patch implements the existing JoinGroup protocol within the new group coordinator. 

Some notable differences:
* Methods return a CoordinatorResult to the runtime framework, which includes records to append to the log as well as a future to complete after the append succeeds/fails.
* The coordinator runtime ensures that only a single thread will be processing a group at any given time, therefore there is no more locking on groups.
* Instead of using on purgatories, we rely on the Timer interface to schedule/cancel delayed operations.

Reviewers: David Jacot <djacot@confluent.io>
2023-07-19 09:15:13 +02:00
David Jacot 32ff347b2c
KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer (#13991)
This patch wires the new group coordinator in BrokerServer (KRaft only). With this, it is now possible to run a cluster with the new group coordinator and to use the ConsumerGroupHeartbeat API by specifying the following two properties:
- group.coordinator.new.enable = true (to enable the new group coordinator)
- unstable.api.versions.enable = true (to enable unreleased APIs)

Note that the new group coordinator does not support all the existing APIs yet.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-07-14 17:41:06 +02:00
David Jacot aafbe34443
KAFKA-14462; [22/N] Implement session and revocation timeouts (#13963)
This patch adds the session timeout and the revocation timeout to the new consumer group protocol.

Reviewers: Calvin Liu <caliu@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-07-12 11:10:51 +02:00
David Jacot 9cde3a7910
KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime (#13961)
This patch adds EventBasedCoordinatorTimer and the CoordinatorTimer interface.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-07-07 22:21:30 +02:00
David Jacot 98fbd8afc7
KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image (#13901)
This patch adds (1) the logic to propagate a new MetadataImage to the running coordinators; and (2) the logic to ensure that all the consumer groups subscribed to topics with changes will refresh their subscriptions metadata on the next heartbeat. In the mean time, it ensures that freshly loaded consumer groups also refresh their subscriptions metadata on the next heartbeat.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-07-05 18:28:38 +02:00
David Jacot 482299c4e2
KAFKA-14462; [19/N] Add CoordinatorLoader implementation (#13880)
This patch adds a coordinator loader implementation.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-06-29 08:12:53 +02:00
David Jacot a81486e4f8
KAFKA-14462; [18/N] Add GroupCoordinatorService (#13812)
This patch introduces the GroupCoordinatorService. This is the new (incomplete) implementation of the group coordinator based on the coordinator runtime introduced in https://github.com/apache/kafka/pull/13795.

Reviewers: Divij Vaidya <diviv@amazon.com>, Justine Olshan <jolshan@confluent.io>
2023-06-22 09:06:10 +02:00
minjian.cai 39a47c8999
MINOR: fix typos for group coordinator (#13886)
Reviewers: Manyanda Chitimbo <manyanda.chitimbo@gmail.com>, Divij Vaidya <diviv@amazon.com>
2023-06-20 22:57:26 +02:00
minjian.cai 3d97743c67
MINOR: Fix some typos for core (#13882)
Reviewers:  Divij Vaidya <diviv@amazon.com>
2023-06-20 22:52:39 +02:00
David Jacot 7556ce366a
KAFKA-14462; [17/N] Add CoordinatorRuntime (#13795)
This patch introduces the CoordinatorRuntime. The CoordinatorRuntime is a framework which encapsulate all the common features requires to build a coordinator such as the group coordinator. Please refer to the javadoc of that class for the details.

Reviewers: Divij Vaidya <diviv@amazon.com>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-06-13 09:46:38 +02:00
David Jacot 7d147cf241
KAFKA-14462; [14/N] Add PartitionWriter (#13675)
This patch introduces the `PartitionWriter` interface in the `group-coordinator` module. The `ReplicaManager` resides in the `core` module and it is thus not accessible from the `group-coordinator` one. The `CoordinatorPartitionWriter` is basically an implementation of the interface residing in `core` which interfaces with the `ReplicaManager`.

One notable difference from the usual produce path is that the `PartitionWriter` returns the offset following the written records. This is then used by the coordinator runtime to track when the request associated with the write can be completed.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-06-06 16:24:48 +02:00
David Jacot e5d67b81fd
KAFKA-14462; [16/N] Add CoordinatorLoader and CoordinatorPlayback interfaces (#13794)
This patch adds the CoordinatorLoader and the CoordinatorPlayback interfaces. The former is used to load (or rebuild) the coordinator state by replaying all the records stored in the partition. The later is the interface that must be implemented by coordinator to support replaying records.

Reviewers: Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-06-02 21:21:51 +02:00
David Jacot 7c3a2846d4
KAFKA-14462; [15/N] Make Result generic and rename it (#13793)
This patch makes the record type generic, moves the class to the runtime package and finally rename the class to follow the naming of the other classes in the runtime package.

Reviewers: Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-06-02 09:18:09 +02:00
David Jacot 47551ea369
KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor (#13666)
Adds CoordinatorEvent, CoordinatorEventProcessor, and MultiThreadedEventProcessor.

Reviewers: Kirk True <ktrue@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-06-01 13:33:40 -07:00
David Jacot 49d9c6775d
KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup (#13639)
This patch adds the GroupMetadataManager to the group-coordinator module. This manager is responsible for handling the groups management, the members management and the entire reconciliation process. At this point, only the new consumer group type/protocol is implemented.

The new manager is based on an architecture inspired from the quorum controller. A request can access/read the state but can't mutate it directly. Instead, a list of records is generated together with the response and those records are applied to the state by the runtime framework. We use timeline data structures. Note that the runtime framework is not part of this patch. It will come in a following one.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-05-31 08:29:41 +02:00
Jeff Kim 15f8705246
KAFKA-14500; [4/N] Add Timer interface (#13708)
Reviewers: David Jacot <djacot@confluent.io>
2023-05-23 10:43:59 +02:00
Jeff Kim c98c1ed41c
KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers (#13704)
This path enables the new group metadata manager to generate GroupMetadataKey/Value records.

Reviewers: David Jacot <djacot@confluent.io>
2023-05-23 10:42:13 +02:00
Jeff Kim cc011f77aa
KAFKA-14500; [2/N] Rewrite GroupMetadata in Java (#13663)
This patch introduces `GenericGroup` which rewrite the `GroupMetadata` in Java. The `GenericGroup` is basically a group using the current rebalance protocol in the new group coordinator.

Reviewers: Ritika Reddy <rreddy@confluent.io>, Christo Lolov <lolovc@amazon.com>, David Jacot <djacot@confluent.io>
2023-05-12 11:22:29 +02:00
Ritika Reddy 4653507926
KAFKA-14514; Add Range Assignor on the Server (KIP-848) (#13443)
This patch adds the RangeAssignor on the server for KIP-848. This range assignor is very different from the old client side implementation. We added functionality to make these assignments sticky while also inheriting crucial properties of the range assignor such as facilitating joins and distributing partitions of a topic somewhat equally amongst its subscribers.

Reviewers: Philip Nee <philipnee@gmail.com>, Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
2023-05-10 14:09:12 +02:00
Jeff Kim 228434d235
KAFKA-14500; [1/N] Rewrite MemberMetadata in Java (#13644)
This patch adds GenericGroupMember which is a rewrite of MemberMetadata in Java.

Reviewers: David Jacot <djacot@confluent.io>
2023-05-09 16:49:27 +02:00
David Jacot 7634eee262
KAFKA-14462; [11/N] Add CurrentAssignmentBuilder (#13638)
This patch adds the `CurrentAssignmentBuilder` class which encapsulates the reconciliation engine of the consumer group protocol. Given the current state of a member and a desired or target assignment state, the state machine takes the necessary steps to converge the member to its desired state.

Reviewers: Ritika Reddy <rreddy@confluent.io>, Calvin Liu <caliu@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-05-08 20:46:07 +02:00
David Jacot 16fc8e1cff
KAFKA-14462; [10/N] Add TargetAssignmentBuilder (#13637)
This patch adds TargetAssignmentBuilder. It is responsible for computing a target assignment for a given group.

Reviewers: Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-05-02 18:04:50 +02:00
David Jacot 8bde4e79cd
KAFKA-14462; [9/N] Add RecordHelpers (#13544)
This patch adds RecordHelpers.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-04-27 14:05:41 +02:00
David Jacot 9a36da12b7
KAFKA-14462; [8/N] Add ConsumerGroupMember (#13538)
This patch adds ConsumerGroupMember.

Reviewers: Christo Lolov <lolovc@amazon.com>, Jeff Kim <jeff.kim@confluent.io>, Jason Gustafson <jason@confluent.io>
2023-04-25 18:50:51 +02:00
David Jacot c39bf714bb
KAFKA-14462; [7/N] Add ClientAssignor, Assignment, TopicMetadata and VersionedMetadata (#13537)
This patch adds ClientAssignor, Assignment, TopicMetadata and VersionedMetadata classes.

Reviewers: Christo Lolov <lolovc@amazon.com>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-04-21 11:22:16 +02:00
Jeff Kim 61530d68ce
KAFKA-14869: Bump coordinator value records to flexible versions (KIP-915, Part-2) (#13526)
This patch implemented the second part of KIP-915. It bumps the versions of the value records used by the group coordinator and the transaction coordinator to make them flexible versions. The new versions are not used when writing to the partitions but only when reading from the partitions. This allows downgrades from future versions that will include tagged fields.

Reviewers: David Jacot <djacot@confluent.io>
2023-04-18 15:37:04 +02:00
David Jacot 741a27351e
KAFKA-14462; [6/N] Update Records (#13536)
This patch updates the KIP-848's records.

Reviewers: Christo Lolov <lolovc@amazon.com>, Jason Gustafson <jason@confluent.io>
2023-04-14 14:25:33 +02:00
Ritika Reddy f1e7a64bf6
MINOR: Refine `PartitionAssignor` server-side interface (#13524)
This patch updates the `PartitionAssignor` server-side interface used in the new group coordinator for the new consumer group protocol as follow:
- It switches subscription from topic names to topic ids in order to be closer to the server side implementation.
- It switches assignment from Set to Map<Integer, Set> to be closer to the server side implementation.
- It adds getters for all attributes.
- It makes all attributes final private.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Alexandre Dupriez <alexandre.dupriez@gmail.com>, David Jacot <djacot@confluent.io>
2023-04-14 14:22:51 +02:00
David Jacot 440a53099d
KAFKA-14462; [5/N] Add EventAccumulator (#13505)
This patch adds the `EventAccumulator` which will be used in the runtime of the new group coordinator. The aim of this accumulator is to basically have a queue per __consumer_group partitions and to ensure that events addressed to the same partitions are not processed concurrently. The accumulator is generic so we could reuse it in different context.

Reviewers: Alexandre Dupriez <alexandre.dupriez@gmail.com>, Justine Olshan <jolshan@confluent.io>
2023-04-13 08:33:40 +02:00
David Jacot e1e3900ba1
KAFKA-14462; [4/N] Add Group, Record and Result (#13520)
This patch adds Group, Record and Result.

Reviewers: Jason Gustafson <jason@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-04-12 13:16:49 +02:00
David Jacot 788cc11f45
KAFKA-14462; [3/N] Add `onNewMetadataImage` to `GroupCoordinator` interface (#13357)
The new group coordinator needs to access cluster metadata (e.g. topics, partitions, etc.) and it needs a mechanism to be notified when the metadata changes (e.g. to trigger a rebalance). In KRaft clusters, the easiest is to subscribe to metadata changes via the MetadataPublisher.

Reviewers: Justine Olshan <jolshan@confluent.io>
2023-03-08 08:52:01 +01:00
David Jacot 6d37b0f07f
KAFKA-14462; [2/N] Add ConsumerGroupHeartbeart to GroupCoordinator interface (#13329)
This patch adds ConsumerGroupHeartbeat to the GroupCoordinator interface and implements the API in KafkaApis.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-03-07 09:20:03 +01:00
David Jacot 39962eeeb3
KAFKA-14513; Add broker side PartitionAssignor interface (#13202)
This patch adds the broker side `PartitionAssignor` interface as detailed in KIP-848. The interfaces differs a bit from the KIP in the following ways:
* The POJOs are not defined within the interface because the interface is to heavy like this.
* The interface is kept in the `group-coordinator` module for now. We don't want to have it out there until KIP-848 is ready to be released. We will move it to its final destination later.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>, Christo Lolov <lolovc@amazon.com>, Guozhang Wang <wangguoz@gmail.com>
2023-02-10 08:26:00 +01:00
David Jacot 659dd2e49f
KAFKA-14048: Add new `__consumer_offsets` records from KIP-848 (#13203)
This patch adds the new (only the new ones) `__consumer_offsets` records as described in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Records).

Reviewers: Christo Lolov <lolovc@amazon.com>, Mickael Maison <mickael.maison@gmail.com>
2023-02-09 09:10:28 +01:00
David Jacot 094e343f18
KAFKA-14678; Move `__consumer_offsets` records from `core` to `group-coordinator` (#13200)
This patch moves the current `__consumer_offsets` records from the `core` module to the new `group-coordinator` module.

Reviewers: Christo Lolov <lolovc@amazon.com>, Mickael Maison <mickael.maison@gmail.com>
2023-02-07 09:06:56 +01:00
David Jacot 2e0a005dd4
KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface (#13112)
This patch migrates all the internal APIs of the current group coordinator to the new `GroupCoordinator` interface. It also makes the current implementation package private to ensure that it is not used anymore.

Reviewers: Justine Olshan <jolshan@confluent.io>
2023-01-20 08:38:21 +01:00
David Jacot 700947aa5a
KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface (#12902)
This patch adds `OffsetDelete` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-01-17 20:39:01 +01:00
David Jacot a2926edc2f
KAFKA-14367; Add `TxnOffsetCommit` to the new `GroupCoordinator` interface (#12901)
This patch adds `TxnOffsetCommit` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-01-13 09:54:54 +01:00
David Jacot e6669672ef
KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface (#12886)
This patch adds `OffsetCommit` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.

Reviewers: Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-01-12 18:05:49 +01:00
David Jacot 24a86423e9
KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface (#12870)
This patch adds OffsetFetch to the new GroupCoordinator interface and updates KafkaApis to use it. 

Reviewers: Philip Nee <pnee@confluent.i>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
2023-01-10 11:38:31 -08:00
David Jacot f8556fe791
KAFKA-14367; Add `DeleteGroups` to the new `GroupCoordinator` interface (#12858)
This patch adds `deleteGroups` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.

Reviewers: Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>, Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
2022-12-15 09:29:56 +01:00
David Jacot 4a9c0fa4a4
KAFKA-14367; Add `DescribeGroups` to the new `GroupCoordinator` interface (#12855)
This patch adds `describeGroups` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.

Reviewers: Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
2022-12-13 09:19:21 +01:00
David Jacot 854dfb5ffc
KAFKA-14367; Add `ListGroups` to the new `GroupCoordinator` interface (#12853)
This patch adds `listGroups` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.

Reviewers: Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
2022-12-07 20:42:42 +01:00
David Jacot df29b17fc4
KAFKA-14367; Add `LeaveGroup` to the new `GroupCoordinator` interface (#12850)
This patch adds `leaveGroup` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.

Reviewers: Justine Olshan <jolshan@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Jason Gustafson <jason@confluent.io>
2022-12-05 09:28:35 +01:00
David Jacot fd05073cc1
KAFKA-14367; Add `SyncGroup` to the new `GroupCoordinator` interface (#12847)
This patch adds `syncGroup` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.

Reviewers: Justine Olshan <jolshan@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Jason Gustafson <jason@confluent.io>
2022-12-02 17:15:29 +01:00
David Jacot f5305fb38d
KAFKA-14367; Add `Heartbeat` to the new `GroupCoordinator` interface (#12848)
This patch adds `heartbeat` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.

Reviewers: Justine Olshan <jolshan@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Jason Gustafson <jason@confluent.io>
2022-12-01 19:59:33 +01:00
David Jacot 98e19b3000
KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface (#12845)
This patch adds `joinGroup` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.

For the context, I will do the same for all the other interactions with the current group coordinator. In order to limit the changes, I have chosen to introduce the `GroupCoordinatorAdapter` that translates the new interface to the old one. It is basically a wrapper. This allows keeping the current group coordinator untouched for now and focus on the `KafkaApis` changes. Eventually, we can remove `GroupCoordinatorAdapter`.

Reviewers: Justine Olshan <jolshan@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
2022-11-29 20:39:12 +01:00
David Jacot 7387a11625
KAFKA-14363; Add new `group-coordinator` module (KIP-848) (#12827)
Introduce new group-coordinator module that will host the future new group coordinator as part of KIP-848.

Reviewers: Ismael Juma <ismael@juma.me.uk>
2022-11-09 08:49:57 +01:00