In `GroupMetadataManager#toTopicPartitions`, we generate a list of `ConsumerGroupHeartbeatRequestData.TopicPartitions` from the input deserialized subscription. Currently the input subscription is `ConsumerPartitionAssignor.Subscription`, where the topic partitions are stored as (topic-partition) pairs, whereas in `ConsumerGroupHeartbeatRequestData.TopicPartitions`, we need the topic partitions to be stored as (topic-partition list) pairs.
`ConsumerProtocolSubscription` is an intermediate data structure in the deserialization where the topic partitions are stored as (topic-partition list) pairs. This pr uses `ConsumerProtocolSubscription` instead as the input subscription to make `toTopicPartitions` more efficient.
Reviewers: David Jacot <djacot@confluent.io>
This patch:
- changes the order of the checks in `validateOnlineDowngrade`, so that only when the last member using the consumer protocol leave and the group still has classic member(s), `online downgrade is disabled` is logged if the policy doesn't allow downgrade.
- changes the session timeout in `convertToConsumerGroup` from `consumerGroupSessionTimeoutMs` to `member.classicProtocolSessionTimeout().get()`.
Reviewers: David Jacot <djacot@confluent.io>
This patch is the continuation of https://github.com/apache/kafka/pull/15964. It introduces the records coalescing to the CoordinatorRuntime. It also introduces a new configuration `group.coordinator.append.linger.ms` which allows administrators to chose the linger time or disable it with zero. The new configuration defaults to 10ms.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
Fix the following NPE:
```
java.lang.NullPointerException: Cannot invoke "org.apache.kafka.coordinator.group.assignor.MemberAssignment.targetPartitions()" because the return value of "java.util.Map.get(Object)" is null
at org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.canMemberParticipateInReassignment(GeneralUniformAssignmentBuilder.java:248)
at org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.balance(GeneralUniformAssignmentBuilder.java:336)
at org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.buildAssignment(GeneralUniformAssignmentBuilder.java:157)
at org.apache.kafka.coordinator.group.assignor.UniformAssignor.assign(UniformAssignor.java:84)
at org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.build(TargetAssignmentBuilder.java:302)
at org.apache.kafka.coordinator.group.GroupMetadataManager.updateTargetAssignment(GroupMetadataManager.java:1913)
at org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupHeartbeat(GroupMetadataManager.java:1518)
at org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupHeartbeat(GroupMetadataManager.java:2254)
at org.apache.kafka.coordinator.group.GroupCoordinatorShard.consumerGroupHeartbeat(GroupCoordinatorShard.java:308)
at org.apache.kafka.coordinator.group.GroupCoordinatorService.lambda$consumerGroupHeartbeat$0(GroupCoordinatorService.java:298)
at org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime$CoordinatorWriteEvent.lambda$run$0(CoordinatorRuntime.java:769)
at org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.withActiveContextOrThrow(CoordinatorRuntime.java:1582)
at org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.access$1400(CoordinatorRuntime.java:96)
at org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime$CoordinatorWriteEvent.run(CoordinatorRuntime.java:767)
at org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor$EventProcessorThread.handleEvents(MultiThreadedEventProcessor.java:144)
at org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor$EventProcessorThread.run(MultiThreadedEventProcessor.java:176)
```
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Justine Olshan <jolshan@confluent.io>
This patch renames the uniform assignor's builders to match the `SubscriptionType` which is used to determine which one is called. It removes the abstract class `AbstractUniformAssignmentBuilder` which is not necessary anymore. It also applies minor refactoring.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This patch moves the `PartitionAssignor` interface and all the related classes to a newly created `group-coordinator/api` module, following the pattern used by the storage and tools modules.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
During online migration, there could be ConsumerGroup that has members that uses the classic protocol. In the current implementation, `STALE_MEMBER_EPOCH` could be thrown in ConsumerGroup offset fetch/commit validation but it's not supported by the classic protocol. Thus this patch changed `ConsumerGroup#validateOffsetCommit` and `ConsumerGroup#validateOffsetFetch` to ensure compatibility.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
This patch reworks the `PartitionAssignor` interface to use interfaces instead of POJOs. It mainly introduces the `MemberSubscriptionSpec` interface that represents a member subscription and changes the `GroupSpec` interfaces to expose the subscriptions and the assignments via different methods.
The patch does not change the performance.
before:
```
Benchmark (memberCount) (partitionsToMemberRatio) (topicCount) Mode Cnt Score Error Units
TargetAssignmentBuilderBenchmark.build 10000 10 100 avgt 5 3.462 ± 0.687 ms/op
TargetAssignmentBuilderBenchmark.build 10000 10 1000 avgt 5 3.626 ± 0.412 ms/op
JMH benchmarks done
```
after:
```
Benchmark (memberCount) (partitionsToMemberRatio) (topicCount) Mode Cnt Score Error Units
TargetAssignmentBuilderBenchmark.build 10000 10 100 avgt 5 3.677 ± 0.683 ms/op
TargetAssignmentBuilderBenchmark.build 10000 10 1000 avgt 5 3.991 ± 0.065 ms/op
JMH benchmarks done
```
Reviewers: David Jacot <djacot@confluent.io>
The time taken to compute a new assignment is critical. This patches extending the existing logging to log it too. This is very useful information to have.
Reviewers: Luke Chen <showuon@gmail.com>
Fix the bug where the group downgrade to a classic one when a member leaves, even though the consumer group size is still larger than `classicGroupMaxSize`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
This patch is a small refactoring which mainly aims at avoid to construct a copy of the new target assignment in the TargetAssignmentBuilder because the copy is not used by the caller. The change relies on the exiting tests and it does not really have an impact on performance (e.g. validated with TargetAssignmentBuilderBenchmark).
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
KIP-932 introduces share groups to go alongside consumer groups. Both kinds of group use server-side assignors but it is unlikely that a single assignor class would be suitable for both. As a result, the KIP introduces specific interfaces for consumer group and share group partition assignors.
This PR introduces only the consumer group interface, `o.a.k.coordinator.group.assignor.ConsumerGroupPartitionAssignor`. The share group interface will come in a later release. The existing implementations of the general `PartitionAssignor` interface have been changed to implement `ConsumerGroupPartitionAssignor` instead and all other code changes are just propagating the change throughout the codebase.
Note that the code in the group coordinator that actually calculates assignments uses the general `PartitionAssignor` interface so that it can be used with both kinds of group, even though the assignors themselves are specific.
Reviewers: Apoorv Mittal <amittal@confluent.io>, David Jacot <djacot@confluent.io>
This patch implements the LeaveGroup API to the consumer groups that are in the mixed mode.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
This patch speeds up the computation of the unassigned partitions by exposing the inverted target assignment. It allows the assignor to check whether a partition is assigned or not.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit. Otherwise, we default the write limit to the min buffer size of 16384 for the write limit. This causes the coordinator to threw RecordTooLargeException even when it's under the 1MB max batch size limit.
Reviewers: David Jacot <djacot@confluent.io>
This patch aims to remove the data structure that stores the conversion from topic names to topic ids which was taking time similar to the actual assignment computation. Instead, we reuse the already existing ConsumerGroupMember.subscribedTopicNames() and do the conversion to topic ids when the iterator is requested.
Reviewers: David Jacot <djacot@confluent.io>
This patch implements the heartbeat api to the members that use the classic protocol in a ConsumerGroup.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
DelayEventAccumulator should return immediately if there are no events in the queue. Also removed some unused fields inside EventProcessorThread.
Reviewers: Gaurav Narula <gaurav_narula2@apple.com>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
We have discovered during large scale performance tests that the current write path of the new coordinator does not scale well. The issue is that each write operation writes synchronously from the coordinator threads. Coalescing records into bigger batches helps drastically because it amortizes the cost of writes. Aligning the batches with the snapshots of the timelines data structures also reduces the number of in-flight snapshots.
This patch is the first of a series of patches that will bring records coalescing into the coordinator runtime. As a first step, we had to rework the PartitionWriter interface and move the logic to build MemoryRecords from it to the CoordinatorRuntime. The main changes are in these two classes. The others are related mechanical changes.
Reviewers: Justine Olshan <jolshan@confluent.io>
This patch implements the sync group api for the consumer groups that are in the mixed mode. In classicGroupSyncToConsumerGroup, the assignedPartitions calculated in the JoinGroup will be returned as the assignment in the sync response and the member session timeout will be rescheduled.
Reviewers: David Jacot <djacot@confluent.io>
This patch removes the usage of the Stream API in CoordinatorRecordHelpers. I saw it in a couple of profiles so it is better to remove it.
Reviewers: Justine Olshan <jolshan@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
The heartbeat api to the consumer group with classic protocol members schedules the session timeout. At present, there's no way to get the classic member session timeout in heartbeat to consumer group.
This patch stores the session timeout into the ClassicMemberMetadata in ConsumerGroupMemberMetadataValue and update it when it's provided in the join request.
Reviewers: David Jacot <djacot@confluent.io>
This patch deprecates `offsets.commit.required.acks` in Apache Kafka 3.8 as described in KIP-1041: https://cwiki.apache.org/confluence/x/9YobEg.
Reviewers: Justine Olshan <jolshan@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Write events create and add a TimerTask to schedule the timeout operation. The issue is that we pile up the number of timer tasks which are essentially no-ops if replication was successful. They stay in memory for 15 seconds (default write timeout) and as the rate of write increases, the impact on memory usage increases.
Instead, cancel the corresponding write timeout task when the write event is committed to the log. This also applies to complete transaction events.
Reviewers: David Jacot <djacot@confluent.io>
The patch implements JoinGroup API for the new consumer groups. It allow members using the classic rebalance protocol with the consumer embedded protocol to join a new consumer group.
Reviewers: David Jacot <djacot@confluent.io>
ConsumerGroupPartitionMetadataValue.Epoch is not used anywhere so we can remove it. Note that we already have non-backward compatible changes lined up for 3.8 so it is fine to do it.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
While reviewing https://github.com/apache/kafka/pull/15785, I noticed that the member is added to the group directly in `ConsumerGroup#getOrMaybeCreateMember`. This does not hurt but confuses people because the state must not be mutated at this point. It should only be mutated when records are replayed. I think that it is better to remove it in order to make it clear.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
We are seeing flaky test in `testConsumerGroupHeartbeatWithStableClassicGroup` where the error is caused by the different ordering in the expected and actual values. The patch sorts the topic partition list in the records to fix the issue.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Igor Soarez <soarez@apple.com>, David Jacot <djacot@confluent.io>
Online downgrade from a consumer group to a classic group is triggered when the last consumer that uses the consumer protocol leaves the group. A rebalance is manually triggered after the group conversion. This patch adds consumer group downgrade validation and conversion.
Reviewers: David Jacot <djacot@confluent.io>
This patch introduces the conversion from a classic group to a consumer group when a member joins with the new consumer group protocol (epoch is 0) but only if the conversion is enabled.
Reviewers: David Jacot <djacot@confluent.io>
This patch adds the `group.consumer.migration.policy` config which controls how consumer groups can be converted from classic group to consumer group and vice versa. The config is kept as an internal one while we develop the feature.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
When implementing the group type conversion from a classic group to a consumer group, if the replay of conversion records fails, the group should be reverted back including its timeouts.
A possible solution is to keep all the classic group timeouts and add a type check to the timeout operations. If the group is successfully upgraded, it won't be able to pass the type check and its operations will be executed without actually doing anything; if the group upgrade fails, the group map will be reverted and the timeout operations will be executed as is.
We've already have group type check in consumer group timeout operations. This patch adds similar type check to those classic group timeout operations.
Reviewers: David Jacot <djacot@confluent.io>
- Use `Empty` instead of 'none' when referring to `Optional` values.
- `Headers.lastHeader` returns `null` when no header is found.
- Fix minor spelling mistakes.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
KIP-890 Part 1 introduced verification of transactions with the
transaction coordinator on the `Produce` and `TxnOffsetCommit` paths.
This introduced the possibility of new errors when responding to those
requests. For backwards compatibility with older clients, a choice was
made to convert some of the new retriable errors to existing errors that
are expected and retried correctly by older clients.
`NETWORK_EXCEPTION` was forgotten about and not converted, but can occur
if, for example, the transaction coordinator is temporarily refusing
connections. Now, we convert it to:
* `NOT_ENOUGH_REPLICAS` on the `Produce` path, just like the other
retriable errors that can arise from transaction verification.
* `COORDINATOR_LOAD_IN_PROGRESS` on the `TxnOffsetCommit` path. This
error does not force coordinator lookup on clients, unlike
`COORDINATOR_NOT_AVAILABLE`. Note that this deviates from KIP-890,
which says that retriable errors should be converted to
`COORDINATOR_NOT_AVAILABLE`.
Reviewers: Artem Livshits <alivshits@confluent.io>, David Jacot <djacot@confluent.io>, Justine Olshan <jolshan@confluent.io>