I have noticed the following log when a __consumer_offsets partition immigrate from a broker. It appends because the event is queued up after the event that unloads the state machine. This patch fixes it and fixes another similar one.
```
[2024-02-06 17:14:51,359] ERROR [GroupCoordinator id=1] Execution of UpdateImage(tp=__consumer_offsets-28, offset=13251) failed due to This is not the correct coordinator.. (org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime)
org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.
```
Reviewers: Justine Olshan <jolshan@confluent.io>
`poll(long timeout, TimeUnit unit)` is either used with `Long.MAX_VALUE` or `0`. This patch replaces it with `poll` and `take`. It removes the `awaitNanos` usage.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
Spotted the following log:
```
[2024-02-14 09:59:30,103] INFO [GroupCoordinator id=1] Finished loading of metadata from 39 in __consumer_offsets-4ms with epoch 2 where 39ms was spent in the scheduler. Loaded 0 records which total to 0 bytes. (org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime)
```
The partition and the time are incorrect. This patch fixes it.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Bruno Cadonna <bruno@confluent.io>
`GroupMetadataManagerTest` class got a little under control. We have too many things defined in it. As a first steps, this patch extracts all the inner classes. It also extracts all the helper methods. However, the logic is not changed at all.
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Justine Olshan <jolshan@confluent.io>
`signalAll` was mistakenly used instead of `signal` when a key become available in the `EventAccumulator`. The fix relies on existing tests.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch adds the support for filtering groups by types (Classic or Consumer) to both the old and the new group coordinators.
Reviewers: David Jacot <djacot@confluent.io>
This patch implements `GroupCoordinator.onPartitionsDeleted` that is called whenever a partition is deleted and must deleted all the offsets related to them. The patch uses a naive approach similar to the one used in the old coordinator. It basically iterates over all the regular end pending offsets and deletes the ones matching the deleted partition set.
Reviewers: Justine Olshan <jolshan@confluent.io>
This patch uniformizes the error handling in the GroupCoordinatorService with the aim to reuse the same error translation for all operations. It also ensures that exceptions are unwrapped if needed.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
While using —list —state the current accepted values correspond to the classic group type states. This patch adds the new states introduced by KIP-848. It also make the matching on the server case insensitive.
Co-authored-by: d00791190 <dinglan6@huawei.com>
Reviewers: Ritika Reddy <rreddy@confluent.io>, David Jacot <djacot@confluent.io>
This is the last patch to complete the implementation of the transactional offsets. This patch updates the following paths:
* delete offsets - the patch ensures that a tombstone is written for pending transactional offsets too.
* delete all offsets - the patch ensures that all pending transactional offsets are deleted too.
* expire offsets - the patch ensures that an offset for a partition is not expire is there is a pending transaction.
* replay offset record - the patch ensures that all pending transactional offsets are removed when a tombstone is received.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Dongnuo Lyu <dlyu@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
The patch populates the topic name of `ConsumerGroupDescribeResponseData.TopicPartitions` with the corresponding topic id in `ConsumerGroupDescribe`.
Reviewers: David Jacot <djacot@confluent.io>
KIP-714 requires client instance cache in broker which should also have a time-based eviction policy where client instances which are not actively sending metrics should be evicted. KIP mentions This client instance specific state is maintained in broker memory up to MAX(60*1000, PushIntervalMs * 3) milliseconds.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>
When transactional offset commits are eventually committed, we must always keep the most recent committed when we have a mix of transactional and regular offset commits. We achieve this by storing the offset of the offset commit record along side the committed offset in memory. Without preserving information of the commit record offset, compaction of the __consumer_offsets topic itself may result in the wrong offset commit being materialized.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ismael Juma <ismael@juma.me.uk> , David Jacot <djacot@confluent.io>, Nikolay <NIzhikov@gmail.com>
We had a case where a partition got assigned to two members and we found a bug in the partition epochs bookkeeping. Basically, when a member has a partition pending revocation re-assigned to him before the revocation is completed, the partition epoch is lost. Here is an example of such transition:
```
[2024-01-16 12:10:52,613] INFO [GroupCoordinator id=1 topic=__consumer_offsets partition=7] [GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member M2 transitioned from CurrentAssignment(memberEpoch=11, previousMemberEpoch=9, targetMemberEpoch=14, state=revoking, assignedPartitions={}, partitionsPendingRevocation={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 3, 4, 5, 6, 7]}, partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[0, 5]}) to CurrentAssignment(memberEpoch=15, previousMemberEpoch=11, targetMemberEpoch=15, state=stable, assignedPartitions={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 3, 4, 5, 6, 7]}, partitionsPendingRevocation={}, partitionsPendingAssignment={}). (org.apache.kafka.coordinator.group.GroupMetadataManager)
```
This patch fixes the bug and also strengthen the partition epochs bookkeeping to not accept such invalid transitions.
Reviewers: Justine Olshan <jolshan@confluent.io>
The current load summary exposes the time from when the partition load operation is scheduled to when the load completes. We are missing the information of how long the scheduled operation stays in the scheduler. Log that information.
Reviewers: David Jacot <djacot@confluent.io>
This PR creates MetadataVersion.latestTesting to represent the highest metadata version (which may be unstable) and MetadataVersion.latestProduction to represent the latest version that should be used in production. It fixes a few cases where the broker was advertising that it supported the testing versions even when unstable metadata versions had not been configured.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
When a replica is deleted, the unloading procedure of the coordinator is called with an empty leader epoch. However, the current implementation of the new group coordinator throws an exception in this case. My bad. This patch updates the logic to handle it correctly.
We discovered the bug in our testing environment. We will add a system test or an integration test in a subsequent patch to better exercise this path.
Reviewers: Justine Olshan <jolshan@confluent.io>
This patch adds `UNSTABLE_OFFSET_COMMIT` errors support in the new group coordinator. `UNSTABLE_OFFSET_COMMIT` errors for partitions with unstable offset commits. Here unstable means that there are ongoing transactions.
Reviewers: Justine Olshan <jolshan@confluent.io>
This patch wires the transaction verification in the new group coordinator. It basically calls the verification path before scheduling the write operation. If the verification fails, the error is returned to the caller.
Note that the patch uses `appendForGroup`. I suppose that we will move away from using it when https://github.com/apache/kafka/pull/15087 is merged.
Reviewers: Justine Olshan <jolshan@confluent.io>
When a retryable write operation fails, we retry with the default 500ms backoff. If a custom retry backoff was used to originally schedule the operation, we should retry with the same custom backoff instead of the default.
Reviewers: David Jacot <djacot@confluent.io>
Remove "group-rebalance-rate" and "group-rebalance-count" metrics from the new coordinator as this is not part of KIP-848.
Reviewers: David Jacot <djacot@confluent.io>
People has raised concerned about using `Generic` as a name to designate the old rebalance protocol. We considered using `Legacy` but discarded it because there are still applications, such as Connect, using the old protocol. We settled on using `Classic` for the `Classic Rebalance Protocol`.
The changes in this patch are extremely mechanical. It basically replaces the occurrences of `generic` by `classic`.
Reviewers: Divij Vaidya <diviv@amazon.com>, Lucas Brutschy <lbrutschy@confluent.io>
This patch wires the handling of makers written by the transaction coordinator via the WriteTxnMarkers API. In the old group coordinator, the markers are written to the logs and the group coordinator is informed to materialize the changes as a second step if the writes were successful. This approach does not really work with the new group coordinator for mainly two reasons: 1) The second step would actually fail while the coordinator is loading and there is no guarantee that the loading has picked up the write or not; 2) It does not fit well with the new memory model where the state is snapshotted by offset. In both cases, it seems that having a single writer to the `__consumer_offsets` partitions is more robust and preferable.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
The new coordinator stops loading if the partition goes offline during load. However, the partition is still considered active. Instead, we should return NOT_LEADER_OR_FOLLOWER exception during load.
Another change is that we only want to invoke CoordinatorPlayback#updateLastCommittedOffset if the current offset (last written offset) is greater than or equal to the current high watermark. This is to ensure that in the case the high watermark is ahead of the current offset, we don't clear snapshots prematurely.
Reviewers: David Jacot <djacot@confluent.io>
This was missing from https://issues.apache.org/jira/browse/KAFKA-14500. The existing coordinator transforms the log append error before returning to client. Apply the same transformation.
Reviewers: David Jacot <djacot@confluent.io>
This patch ensure that `offset.commit.timeout.ms` is enforced. It does so by adding a timeout to the CoordinatorWriteEvent.
Reviewers: David Jacot <djacot@confluent.io>
Currently, we increment generic group metrics whenever we create a new Group object when we load a partition. This is incorrect as the partition may contain several records for the same group if in the active segment or if the segment has not yet been compacted.
The same applies to removing groups; we can possibly have multiple group tombstone records. Instead, only increment the metric if we created a new group and only decrement the metric if the group exists.
Reviewers: David Jacot <djacot@confluent.io>
The support for regular expressions has not been implemented yet in the new consumer group protocol. This patch removes the `SubscribedTopicRegex` from the `ConsumerGroupHeartbeatRequest` in preparation for 3.7. It seems better to bump the version and add it back when we implement the feature, as part of https://issues.apache.org/jira/browse/KAFKA-14517, instead of having an unused field in the request.
Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, Justine Olshan <jolshan@confluent.io>
This patch implements the TxnOffsetCommit API. When a transactional offset commit is received, it is stored in the pending transactional offsets structure and waits there until the transaction is committed or aborted. Note that the handling of the transaction completion is not implemented in this patch.
Reviewers: Justine Olshan <jolshan@confluent.io>
After the new coordinator loads a __consumer_offsets partition, it logs the following exception when making a read operation (fetch/list groups, etc):
```
java.lang.RuntimeException: No in-memory snapshot for epoch 740745. Snapshot epochs are:
at org.apache.kafka.timeline.SnapshotRegistry.getSnapshot(SnapshotRegistry.java:178)
at org.apache.kafka.timeline.SnapshottableHashTable.snapshottableIterator(SnapshottableHashTable.java:407)
at org.apache.kafka.timeline.TimelineHashMap$ValueIterator.<init>(TimelineHashMap.java:283)
at org.apache.kafka.timeline.TimelineHashMap$Values.iterator(TimelineHashMap.java:271)
```
This happens because we don't have a snapshot at the last updated high watermark after loading. We cannot generate a snapshot at the high watermark after loading all batches because it may contain records that have not yet been committed. We also don't know where the high watermark will advance up to so we need to generate a snapshot for each offset the loader observes to be greater than the current high watermark. Then once we add the high watermark listener and update the high watermark we can delete all of the older snapshots.
Reviewers: David Jacot <djacot@confluent.io>
This patch adds the `Uniform` assignor to the default list of supported assignors. It also do small changes in the code.
Reviewers: Justine Olshan <jolshan@confluent.io>
The group expiration log becomes noisy when we encounter a retry-able error as the retry backoff is fixed to 500 ms. Allow customizable retry backoff so that even in the case of failure we have a longer delay. The current default for offsetsRetentionCheckIntervalMs is set to 10 minutes so even if the operation fails we will "retry" after 10 minutes.
Reviewers: David Jacot <djacot@confluent.io>
This patch adds support for transactional writes to the CoordinatorRuntime framework. This mainly consists in adding CoordinatorRuntime#scheduleTransactionalWriteOperation and in adding the producerId and producerEpoch to various interfaces. The patch also extends the CoordinatorLoaderImpl and the CoordinatorPartitionWriter accordingly.
Reviewers: Justine Olshan <jolshan@confluent.io>
This patch add the support for static membership to the new consumer group protocol. With a static member can join, re-join, temporarily leave and leave. When a member leaves with the expectation to rejoin, it must rejoin within the session timeout. It is kicks out from the consumer group otherwise.
Reviewers: David Jacot <djacot@confluent.io>
This patch updates the testing framework to support running tests with kraft and the new group coordinator introduced in the context of KIP-848. This can be done by using `kraft+kip-848` as a quorum. Note that this is temporary until we make it the default and only option in 4.0. To verify this, this patch also enables kraft and kraft+kip-848 in PlaintextConsumerTest and its parent classes.
Reviewers: David Jacot <djacot@confluent.io>
This patch adds the second part of the Uniform Assignor, used when the subscriptions of each member in a consumer group are different.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
This patch copy over existing metrics and add new consumer group metrics to the new GroupCoordinatorService.
Now that each coordinator is responsible for a topic partition, this patch introduces a GroupCoordinatorMetrics that records gauges for global metrics such as the number of generic groups in PreparingRebalance state, etc. For GroupCoordinatorShard specific metrics, GroupCoordinatorMetrics will activate new GroupCoordinatorMetricsShards that will be responsible for incrementing/decrementing TimelineLong objects and then aggregate the total amount across all shards.
As the CoordinatorRuntime/CoordinatorShard does not care about group metadata, we have introduced a CoordinatorMetrics.java/CoordinatorMetricsShard.java so that in the future transaction coordinator metrics can also be onboarded in a similar fashion.
Main files to look at:
GroupCoordinatorMetrics.java
GroupCoordinatorMetricsShard.java
CoordinatorMetrics.java
CoordinatorMetricsShard.java
CoordinatorRuntime.java
Metrics to add after #14408 is merged:
offset deletions sensor (OffsetDeletions); Meter(offset-deletion-rate, offset-deletion-count)
Metrics to add after https://issues.apache.org/jira/browse/KAFKA-14987 is merged:
offset expired sensor (OffsetExpired); Meter(offset-expiration-rate, offset-expiration-count)
Reviewers: Justine Olshan <jolshan@confluent.io>
When the group coordinator does not host any __consumer_offsets partitions, the existing ListGroup implementation won't schedule any operation, thus a `new CompletableFuture<>()` is returned directly and never gets completed. This patch fixes the issue.
Reviewers: David Jacot <djacot@confluent.io>
In `KafkaApis.scala`, we build the API response differently if exceptions are thrown during the API execution. Since the new group coordinator only populates the response with error code instead of throwing an exception when an error occurs, there may be different behavior between the existing group coordinator and the new one.
This patch:
- Fixes the response building in `KafkaApis.scala` for the two APIs affected by such difference -- OffsetFetch and OffsetDelete.
- In `GroupCoordinatorService.java`, returns a response with error code instead of a failed future when the coordinator is not active.
Reviewers: David Jacot <djacot@confluent.io>