From 1a106e453872403ae6803a3ca30d7f8b379ae26e Mon Sep 17 00:00:00 2001 From: Dongnuo Lyu <139248811+dongnuo123@users.noreply.github.com> Date: Mon, 3 Feb 2025 10:50:21 -0500 Subject: [PATCH] KAFKA-18655: Implement the consumer group size counter with scheduled task (#18717) During testing we discovered that the empty group count is not updated in group conversion, but when the new group is transition to other state, the empty group count is decremented. This could result in negative empty group count. We can have a new consumer group count implementation that follows the pattern we did for the classic group count. The timeout task periodically refreshes the metrics based on the current groups soft state. Reviewers: Jeff Kim --- .../group/GroupCoordinatorShard.java | 24 +-- .../group/GroupMetadataManager.java | 43 ++--- .../metrics/GroupCoordinatorMetricsShard.java | 117 ++------------ .../group/modern/consumer/ConsumerGroup.java | 2 - .../group/GroupCoordinatorShardTest.java | 14 +- .../group/GroupMetadataManagerTest.java | 148 ++++++------------ .../GroupCoordinatorMetricsShardTest.java | 124 --------------- .../metrics/GroupCoordinatorMetricsTest.java | 9 +- .../modern/consumer/ConsumerGroupTest.java | 43 ----- 9 files changed, 107 insertions(+), 417 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 85fb54761b6..86c7e8abba7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -258,11 +258,11 @@ public class GroupCoordinatorShard implements CoordinatorShard { - groupMetadataManager.updateClassicGroupSizeCounter(); - scheduleClassicGroupSizeCounter(); + groupMetadataManager.updateGroupSizeCounter(); + scheduleGroupSizeCounter(); return GroupMetadataManager.EMPTY_RESULT; } ); } /** - * Cancels the group size counter for the classic groups. + * Cancels the group size counter for the classic/consumer groups. */ - private void cancelClassicGroupSizeCounter() { - timer.cancel(CLASSIC_GROUP_SIZE_COUNTER_KEY); + private void cancelGroupSizeCounter() { + timer.cancel(GROUP_SIZE_COUNTER_KEY); } /** @@ -736,7 +736,7 @@ public class GroupCoordinatorShard implements CoordinatorShard groupSizeCounter = new HashMap<>(); + public void updateGroupSizeCounter() { + Map classicGroupSizeCounter = new HashMap<>(); + Map consumerGroupSizeCounter = new HashMap<>(); groups.forEach((__, group) -> { - if (group.type() == CLASSIC) { - groupSizeCounter.compute(((ClassicGroup) group).currentState(), Utils::incValue); + switch (group.type()) { + case CLASSIC: + classicGroupSizeCounter.compute(((ClassicGroup) group).currentState(), Utils::incValue); + break; + case CONSUMER: + consumerGroupSizeCounter.compute(((ConsumerGroup) group).state(), Utils::incValue); + break; + default: + break; } }); - metrics.setClassicGroupGauges(groupSizeCounter); + metrics.setClassicGroupGauges(classicGroupSizeCounter); + metrics.setConsumerGroupGauges(consumerGroupSizeCounter); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java index 219a4f0a22c..1ed75229f58 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java @@ -71,7 +71,7 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard { /** * Consumer group size gauge counters keyed by the metric name. */ - private final Map consumerGroupGauges; + private volatile Map consumerGroupGauges; /** * Share group size gauge counters keyed by the metric name. @@ -108,19 +108,7 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard { numClassicGroupsTimelineCounter = new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0)); this.classicGroupGauges = Collections.emptyMap(); - - this.consumerGroupGauges = Utils.mkMap( - Utils.mkEntry(ConsumerGroupState.EMPTY, - new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))), - Utils.mkEntry(ConsumerGroupState.ASSIGNING, - new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))), - Utils.mkEntry(ConsumerGroupState.RECONCILING, - new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))), - Utils.mkEntry(ConsumerGroupState.STABLE, - new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))), - Utils.mkEntry(ConsumerGroupState.DEAD, - new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))) - ); + this.consumerGroupGauges = Collections.emptyMap(); this.shareGroupGauges = Utils.mkMap( Utils.mkEntry(ShareGroup.ShareGroupState.EMPTY, @@ -145,17 +133,15 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard { } /** - * Increment the number of consumer groups. + * Set the number of consumer groups. + * This method should be the only way to update the map and is called by the scheduled task + * that updates the metrics in {@link org.apache.kafka.coordinator.group.GroupCoordinatorShard}. + * Breaking this will result in inconsistent behavior. * - * @param state the consumer group state. + * @param consumerGroupGauges The map counting the number of consumer groups in each state. */ - public void incrementNumConsumerGroups(ConsumerGroupState state) { - TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state); - if (gaugeCounter != null) { - synchronized (gaugeCounter.timelineLong) { - gaugeCounter.timelineLong.increment(); - } - } + public void setConsumerGroupGauges(Map consumerGroupGauges) { + this.consumerGroupGauges = consumerGroupGauges; } /** @@ -167,20 +153,6 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard { } } - /** - * Decrement the number of consumer groups. - * - * @param state the consumer group state. - */ - public void decrementNumConsumerGroups(ConsumerGroupState state) { - TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state); - if (gaugeCounter != null) { - synchronized (gaugeCounter.timelineLong) { - gaugeCounter.timelineLong.decrement(); - } - } - } - /** * @return The number of offsets. */ @@ -219,9 +191,9 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard { * @return The number of consumer groups in `state`. */ public long numConsumerGroups(ConsumerGroupState state) { - TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state); - if (gaugeCounter != null) { - return gaugeCounter.atomicLong.get(); + Long counter = consumerGroupGauges.get(state); + if (counter != null) { + return counter; } return 0L; } @@ -231,7 +203,7 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard { */ public long numConsumerGroups() { return consumerGroupGauges.values().stream() - .mapToLong(timelineGaugeCounter -> timelineGaugeCounter.atomicLong.get()).sum(); + .mapToLong(Long::longValue).sum(); } @Override @@ -257,14 +229,6 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard { @Override public void commitUpTo(long offset) { - this.consumerGroupGauges.forEach((__, gaugeCounter) -> { - long value; - synchronized (gaugeCounter.timelineLong) { - value = gaugeCounter.timelineLong.get(offset); - } - gaugeCounter.atomicLong.set(value); - }); - synchronized (numClassicGroupsTimelineCounter.timelineLong) { long value = numClassicGroupsTimelineCounter.timelineLong.get(offset); numClassicGroupsTimelineCounter.atomicLong.set(value); @@ -286,8 +250,11 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard { /** * Sets the classicGroupGauges. + * This method should be the only way to update the map and is called by the scheduled task + * that updates the metrics in {@link org.apache.kafka.coordinator.group.GroupCoordinatorShard}. + * Breaking this will result in inconsistent behavior. * - * @param classicGroupGauges The new classicGroupGauges. + * @param classicGroupGauges The map counting the number of classic groups in each state. */ public void setClassicGroupGauges( Map classicGroupGauges @@ -295,56 +262,6 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard { this.classicGroupGauges = classicGroupGauges; } - /** - * Called when a consumer group's state has changed. Increment/decrement - * the counter accordingly. - * - * @param oldState The previous state. null value means that it's a new group. - * @param newState The next state. null value means that the group has been removed. - */ - public void onConsumerGroupStateTransition( - ConsumerGroupState oldState, - ConsumerGroupState newState - ) { - if (newState != null) { - switch (newState) { - case EMPTY: - incrementNumConsumerGroups(ConsumerGroupState.EMPTY); - break; - case ASSIGNING: - incrementNumConsumerGroups(ConsumerGroupState.ASSIGNING); - break; - case RECONCILING: - incrementNumConsumerGroups(ConsumerGroupState.RECONCILING); - break; - case STABLE: - incrementNumConsumerGroups(ConsumerGroupState.STABLE); - break; - case DEAD: - incrementNumConsumerGroups(ConsumerGroupState.DEAD); - } - } - - if (oldState != null) { - switch (oldState) { - case EMPTY: - decrementNumConsumerGroups(ConsumerGroupState.EMPTY); - break; - case ASSIGNING: - decrementNumConsumerGroups(ConsumerGroupState.ASSIGNING); - break; - case RECONCILING: - decrementNumConsumerGroups(ConsumerGroupState.RECONCILING); - break; - case STABLE: - decrementNumConsumerGroups(ConsumerGroupState.STABLE); - break; - case DEAD: - decrementNumConsumerGroups(ConsumerGroupState.DEAD); - } - } - } - public void incrementNumShareGroups(ShareGroup.ShareGroupState state) { TimelineGaugeCounter gaugeCounter = shareGroupGauges.get(state); if (gaugeCounter != null) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java index 58d70ffe99a..16e0d0fae4a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java @@ -885,7 +885,6 @@ public class ConsumerGroup extends ModernGroup { @Override protected void maybeUpdateGroupState() { - ConsumerGroupState previousState = state.get(); ConsumerGroupState newState = STABLE; if (members.isEmpty()) { newState = EMPTY; @@ -901,7 +900,6 @@ public class ConsumerGroup extends ModernGroup { } state.set(newState); - metrics.onConsumerGroupStateTransition(previousState, newState); } /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index 4648f35670b..ea0c2b02e4d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -79,9 +79,9 @@ import java.util.List; import java.util.Set; import static org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext; -import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.CLASSIC_GROUP_SIZE_COUNTER_KEY; import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS; import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_EXPIRATION_KEY; +import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_SIZE_COUNTER_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -1024,7 +1024,7 @@ public class GroupCoordinatorShardTest { } @Test - public void testScheduleClassicGroupSizeCounter() { + public void testScheduleGroupSizeCounter() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); @@ -1046,21 +1046,21 @@ public class GroupCoordinatorShardTest { ); coordinator.onLoaded(MetadataImage.EMPTY); - // The classic group size counter is scheduled. + // The counter is scheduled. assertEquals( DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS, - timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds() + timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds() ); // Advance the timer to trigger the update. time.sleep(DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS + 1); timer.poll(); - verify(groupMetadataManager, times(1)).updateClassicGroupSizeCounter(); + verify(groupMetadataManager, times(1)).updateGroupSizeCounter(); - // The classic group size counter is scheduled. + // The counter is scheduled. assertEquals( DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS, - timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds() + timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds() ); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 4e4e7cf7645..1287f121327 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -3589,42 +3589,71 @@ public class GroupMetadataManagerTest { } @Test - public void testUpdateClassicGroupSizeCounter() { - String groupId0 = "group-0"; - String groupId1 = "group-1"; - String groupId2 = "group-2"; - String groupId3 = "group-3"; - String groupId4 = "group-4"; + public void testUpdateGroupSizeCounter() { + List groupIds = new ArrayList<>(); + IntStream.range(0, 8).forEach(i -> groupIds.add("group-" + i)); + List consumerMemberIds = List.of("consumer-member-id-0", "consumer-member-id-1", "consumer-member-id-2"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withConsumerGroup(new ConsumerGroupBuilder(groupId0, 10)) + .withConsumerGroup(new ConsumerGroupBuilder(groupIds.get(0), 10)) // Empty group + .withConsumerGroup(new ConsumerGroupBuilder(groupIds.get(1), 10) // Stable group + .withAssignmentEpoch(10) + .withMember(new ConsumerGroupMember.Builder(consumerMemberIds.get(0)) + .setMemberEpoch(10) + .build())) + .withConsumerGroup(new ConsumerGroupBuilder(groupIds.get(2), 10) // Assigning group + .withAssignmentEpoch(9) + .withMember(new ConsumerGroupMember.Builder(consumerMemberIds.get(1)) + .setMemberEpoch(9) + .build())) + .withConsumerGroup(new ConsumerGroupBuilder(groupIds.get(3), 10) // Reconciling group + .withAssignmentEpoch(10) + .withMember(new ConsumerGroupMember.Builder(consumerMemberIds.get(2)) + .setMemberEpoch(9) + .build())) .build(); - ClassicGroup group1 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId1, true); - ClassicGroup group2 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId2, true); - ClassicGroup group3 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId3, true); - ClassicGroup group4 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId4, true); + ClassicGroup group4 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupIds.get(4), true); + ClassicGroup group5 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupIds.get(5), true); + ClassicGroup group6 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupIds.get(6), true); + ClassicGroup group7 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupIds.get(7), true); - context.groupMetadataManager.updateClassicGroupSizeCounter(); + context.groupMetadataManager.updateGroupSizeCounter(); verify(context.metrics, times(1)).setClassicGroupGauges(eq(Utils.mkMap( Utils.mkEntry(ClassicGroupState.EMPTY, 4L) ))); + verify(context.metrics, times(1)).setConsumerGroupGauges(eq(Utils.mkMap( + Utils.mkEntry(ConsumerGroup.ConsumerGroupState.EMPTY, 1L), + Utils.mkEntry(ConsumerGroup.ConsumerGroupState.ASSIGNING, 1L), + Utils.mkEntry(ConsumerGroup.ConsumerGroupState.RECONCILING, 1L), + Utils.mkEntry(ConsumerGroup.ConsumerGroupState.STABLE, 1L) + ))); - group1.transitionTo(PREPARING_REBALANCE); - group2.transitionTo(PREPARING_REBALANCE); - group2.transitionTo(COMPLETING_REBALANCE); - group3.transitionTo(PREPARING_REBALANCE); - group3.transitionTo(COMPLETING_REBALANCE); - group3.transitionTo(STABLE); - group4.transitionTo(DEAD); + group4.transitionTo(PREPARING_REBALANCE); + group5.transitionTo(PREPARING_REBALANCE); + group5.transitionTo(COMPLETING_REBALANCE); + group6.transitionTo(PREPARING_REBALANCE); + group6.transitionTo(COMPLETING_REBALANCE); + group6.transitionTo(STABLE); + group7.transitionTo(DEAD); - context.groupMetadataManager.updateClassicGroupSizeCounter(); + context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupIds.get(1), false, Collections.emptyList()) + .removeMember(consumerMemberIds.get(0)); + context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupIds.get(3), false, Collections.emptyList()) + .updateMember(new ConsumerGroupMember.Builder(consumerMemberIds.get(2)).setMemberEpoch(10).build()); + + context.groupMetadataManager.updateGroupSizeCounter(); verify(context.metrics, times(1)).setClassicGroupGauges(eq(Utils.mkMap( Utils.mkEntry(ClassicGroupState.PREPARING_REBALANCE, 1L), Utils.mkEntry(ClassicGroupState.COMPLETING_REBALANCE, 1L), Utils.mkEntry(ClassicGroupState.STABLE, 1L), Utils.mkEntry(ClassicGroupState.DEAD, 1L) ))); + verify(context.metrics, times(1)).setConsumerGroupGauges(eq(Utils.mkMap( + Utils.mkEntry(ConsumerGroup.ConsumerGroupState.EMPTY, 2L), + Utils.mkEntry(ConsumerGroup.ConsumerGroupState.ASSIGNING, 1L), + Utils.mkEntry(ConsumerGroup.ConsumerGroupState.STABLE, 1L) + ))); } @Test @@ -9548,75 +9577,6 @@ public class GroupMetadataManagerTest { verify(context.metrics).record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); } - @Test - public void testOnClassicGroupStateTransitionOnLoading() { - GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .build(); - - ClassicGroup group = new ClassicGroup( - new LogContext(), - "group-id", - EMPTY, - context.time - ); - - // Even if there are more group metadata records loaded than tombstone records, the last replayed record - // (tombstone in this test) is the latest state of the group. Hence, the overall metric count should be 0. - IntStream.range(0, 5).forEach(__ -> - context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, Collections.emptyMap())) - ); - IntStream.range(0, 4).forEach(__ -> - context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord("group-id")) - ); - } - - @Test - public void testOnConsumerGroupStateTransition() { - GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .build(); - - // Replaying a consumer group epoch record should increment metric. - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("group-id", 1)); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(null, ConsumerGroup.ConsumerGroupState.EMPTY); - - // Replaying a consumer group epoch record for a group that has already been created should not increment metric. - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("group-id", 1)); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(null, ConsumerGroup.ConsumerGroupState.EMPTY); - - // Creating and replaying tombstones for a group should remove group and decrement metric. - List tombstones = new ArrayList<>(); - Group group = context.groupMetadataManager.group("group-id"); - group.createGroupTombstoneRecords(tombstones); - tombstones.forEach(context::replay); - assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.group("group-id")); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); - - // Replaying a tombstone for a group that has already been removed should not decrement metric. - tombstones.forEach(context::replay); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); - } - - @Test - public void testOnConsumerGroupStateTransitionOnLoading() { - GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .build(); - - // Even if there are more group epoch records loaded than tombstone records, the last replayed record - // (tombstone in this test) is the latest state of the group. Hence, the overall metric count should be 0. - IntStream.range(0, 5).forEach(__ -> - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("group-id", 0)) - ); - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("group-id")); - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("group-id")); - IntStream.range(0, 3).forEach(__ -> { - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("group-id")); - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("group-id")); - }); - - verify(context.metrics, times(1)).onConsumerGroupStateTransition(null, ConsumerGroup.ConsumerGroupState.EMPTY); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); - } - @Test public void testConsumerGroupHeartbeatWithNonEmptyClassicGroup() { String classicGroupId = "classic-group-id"; @@ -11153,8 +11113,6 @@ public class GroupMetadataManagerTest { result.records() ); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null); - // The new classic member 1 has a heartbeat timeout. ScheduledTimeout heartbeatTimeout = context.timer.timeout( classicGroupHeartbeatKey(groupId, memberId1) @@ -11340,8 +11298,6 @@ public class GroupMetadataManagerTest { timeout.result.records() ); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null); - // The new classic member 1 has a heartbeat timeout. ScheduledTimeout heartbeatTimeout = context.timer.timeout( classicGroupHeartbeatKey(groupId, memberId1) @@ -11545,8 +11501,6 @@ public class GroupMetadataManagerTest { timeout.result.records() ); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.RECONCILING, null); - // The new classic member 1 has a heartbeat timeout. ScheduledTimeout heartbeatTimeout = context.timer.timeout( classicGroupHeartbeatKey(groupId, memberId1) @@ -11780,8 +11734,6 @@ public class GroupMetadataManagerTest { result.records ); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null); - // The new classic member 1 has a heartbeat timeout. ScheduledTimeout heartbeatTimeout = context.timer.timeout( classicGroupHeartbeatKey(groupId, memberId1) @@ -14298,8 +14250,6 @@ public class GroupMetadataManagerTest { leaveResult.records() ); - verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null); - // The new classic member 1 has a heartbeat timeout. ScheduledTimeout heartbeatTimeout = context.timer.timeout( classicGroupHeartbeatKey(groupId, memberId1) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java index 06ffeaa84d1..7dc3a4d17f6 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java @@ -17,22 +17,15 @@ package org.apache.kafka.coordinator.group.metrics; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.Uuid; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; -import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.timeline.SnapshotRegistry; import com.yammer.metrics.core.MetricsRegistry; import org.junit.jupiter.api.Test; -import java.util.Collections; -import java.util.stream.IntStream; - -import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertGaugeValue; import static org.junit.jupiter.api.Assertions.assertEquals; public class GroupCoordinatorMetricsShardTest { @@ -47,135 +40,18 @@ public class GroupCoordinatorMetricsShardTest { GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard(snapshotRegistry, tp); shard.incrementNumOffsets(); - shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY); - shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING); - shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING); - shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE); - shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD); snapshotRegistry.idempotentCreateSnapshot(1000); // The value should not be updated until the offset has been committed. assertEquals(0, shard.numOffsets()); - assertEquals(0, shard.numConsumerGroups()); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING)); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING)); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE)); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD)); shard.commitUpTo(1000); assertEquals(1, shard.numOffsets()); - assertEquals(5, shard.numConsumerGroups()); - assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); - assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING)); - assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING)); - assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE)); - assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD)); shard.decrementNumOffsets(); - shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY); - shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING); - shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING); - shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE); - shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD); snapshotRegistry.idempotentCreateSnapshot(2000); shard.commitUpTo(2000); assertEquals(0, shard.numOffsets()); - assertEquals(0, shard.numConsumerGroups()); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING)); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING)); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE)); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD)); - } - - @Test - public void testConsumerGroupStateTransitionMetrics() { - MetricsRegistry registry = new MetricsRegistry(); - Metrics metrics = new Metrics(); - SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - TopicPartition tp = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0); - GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics); - GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard(snapshotRegistry, tp); - coordinatorMetrics.activateMetricsShard(shard); - - ConsumerGroup group0 = new ConsumerGroup( - snapshotRegistry, - "group-0", - shard - ); - ConsumerGroup group1 = new ConsumerGroup( - snapshotRegistry, - "group-1", - shard - ); - ConsumerGroup group2 = new ConsumerGroup( - snapshotRegistry, - "group-2", - shard - ); - ConsumerGroup group3 = new ConsumerGroup( - snapshotRegistry, - "group-3", - shard - ); - - IntStream.range(0, 4).forEach(__ -> shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); - - snapshotRegistry.idempotentCreateSnapshot(1000); - shard.commitUpTo(1000); - assertEquals(4, shard.numConsumerGroups()); - assertEquals(4, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); - - ConsumerGroupMember member0 = group0.getOrMaybeCreateMember("member-id", true); - ConsumerGroupMember member1 = group1.getOrMaybeCreateMember("member-id", true); - ConsumerGroupMember member2 = group2.getOrMaybeCreateMember("member-id", true); - ConsumerGroupMember member3 = group3.getOrMaybeCreateMember("member-id", true); - group0.updateMember(member0); - group1.updateMember(member1); - group2.updateMember(member2); - group3.updateMember(member3); - - snapshotRegistry.idempotentCreateSnapshot(2000); - shard.commitUpTo(2000); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); - assertEquals(4, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE)); - - group2.setGroupEpoch(1); - group3.setGroupEpoch(1); - - snapshotRegistry.idempotentCreateSnapshot(3000); - shard.commitUpTo(3000); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); - assertEquals(2, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING)); - assertEquals(2, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE)); - - group2.setTargetAssignmentEpoch(1); - - // Set member2 to ASSIGNING state. - new ConsumerGroupMember.Builder(member2) - .setPartitionsPendingRevocation(Collections.singletonMap(Uuid.ZERO_UUID, Collections.singleton(0))) - .build(); - - snapshotRegistry.idempotentCreateSnapshot(4000); - shard.commitUpTo(4000); - assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY)); - assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING)); - assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING)); - assertEquals(2, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE)); - - assertGaugeValue(metrics, metrics.metricName("group-count", "group-coordinator-metrics", - Collections.singletonMap("protocol", "consumer")), 4); - assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics", - Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.EMPTY.toString())), 0); - assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics", - Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())), 1); - assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics", - Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.RECONCILING.toString())), 1); - assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics", - Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.STABLE.toString())), 2); - assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics", - Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.DEAD.toString())), 0); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java index d04aa533873..05da88f9f7a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java @@ -38,6 +38,7 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.stream.IntStream; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME; @@ -181,9 +182,11 @@ public class GroupCoordinatorMetricsTest { Utils.mkEntry(ClassicGroupState.DEAD, 1L) )); - IntStream.range(0, 5).forEach(__ -> shard0.incrementNumConsumerGroups(ConsumerGroupState.ASSIGNING)); - IntStream.range(0, 5).forEach(__ -> shard1.incrementNumConsumerGroups(ConsumerGroupState.RECONCILING)); - IntStream.range(0, 3).forEach(__ -> shard1.decrementNumConsumerGroups(ConsumerGroupState.DEAD)); + shard0.setConsumerGroupGauges(Collections.singletonMap(ConsumerGroupState.ASSIGNING, 5L)); + shard1.setConsumerGroupGauges(Map.of( + ConsumerGroupState.RECONCILING, 1L, + ConsumerGroupState.DEAD, 1L + )); IntStream.range(0, 6).forEach(__ -> shard0.incrementNumOffsets()); IntStream.range(0, 2).forEach(__ -> shard1.incrementNumOffsets()); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java index c2e091aa354..886820e5513 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java @@ -81,8 +81,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; public class ConsumerGroupTest { @@ -426,7 +424,6 @@ public class ConsumerGroupTest { @Test public void testGroupState() { - Uuid fooTopicId = Uuid.randomUuid(); ConsumerGroup consumerGroup = createConsumerGroup("foo"); assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY, consumerGroup.state()); @@ -1295,46 +1292,6 @@ public class ConsumerGroupTest { assertEquals(expected, actual); } - @Test - public void testStateTransitionMetrics() { - // Confirm metrics is not updated when a new ConsumerGroup is created but only when the group transitions - // its state. - GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class); - ConsumerGroup consumerGroup = new ConsumerGroup( - new SnapshotRegistry(new LogContext()), - "group-id", - metrics - ); - - assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY, consumerGroup.state()); - verify(metrics, times(0)).onConsumerGroupStateTransition(null, ConsumerGroup.ConsumerGroupState.EMPTY); - - ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") - .setMemberEpoch(1) - .setPreviousMemberEpoch(0) - .build(); - - consumerGroup.updateMember(member); - - assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, consumerGroup.state()); - verify(metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, ConsumerGroup.ConsumerGroupState.RECONCILING); - - consumerGroup.setGroupEpoch(1); - - assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, consumerGroup.state()); - verify(metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.RECONCILING, ConsumerGroup.ConsumerGroupState.ASSIGNING); - - consumerGroup.setTargetAssignmentEpoch(1); - - assertEquals(ConsumerGroup.ConsumerGroupState.STABLE, consumerGroup.state()); - verify(metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.ASSIGNING, ConsumerGroup.ConsumerGroupState.STABLE); - - consumerGroup.removeMember("member"); - - assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY, consumerGroup.state()); - verify(metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, ConsumerGroup.ConsumerGroupState.EMPTY); - } - @Test public void testIsInStatesCaseInsensitive() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());