KAFKA-18655: Implement the consumer group size counter with scheduled task (#18717)

During testing we discovered that the empty group count is not updated in group conversion, but when the new group is transition to other state, the empty group count is decremented. This could result in negative empty group count.

We can have a new consumer group count implementation that follows the pattern we did for the classic group count. The timeout task periodically refreshes the metrics based on the current groups soft state.

Reviewers: Jeff Kim <jeff.kim@confluent.io>
This commit is contained in:
Dongnuo Lyu 2025-02-03 10:50:21 -05:00 committed by GitHub
parent 7fdd11295c
commit 1a106e4538
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 107 additions and 417 deletions

View File

@ -258,11 +258,11 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
static final String GROUP_EXPIRATION_KEY = "expire-group-metadata";
/**
* The classic group size counter key to schedule a timer task.
* The classic and consumer group size counter key to schedule a timer task.
*
* Visible for testing.
*/
static final String CLASSIC_GROUP_SIZE_COUNTER_KEY = "classic-group-size-counter";
static final String GROUP_SIZE_COUNTER_KEY = "group-size-counter";
/**
* Hardcoded default value of the interval to update the classic group size counter.
@ -699,27 +699,27 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
}
/**
* Schedules (or reschedules) the group size counter for the classic groups.
* Schedules (or reschedules) the group size counter for the classic/consumer groups.
*/
private void scheduleClassicGroupSizeCounter() {
private void scheduleGroupSizeCounter() {
timer.schedule(
CLASSIC_GROUP_SIZE_COUNTER_KEY,
GROUP_SIZE_COUNTER_KEY,
DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
TimeUnit.MILLISECONDS,
true,
() -> {
groupMetadataManager.updateClassicGroupSizeCounter();
scheduleClassicGroupSizeCounter();
groupMetadataManager.updateGroupSizeCounter();
scheduleGroupSizeCounter();
return GroupMetadataManager.EMPTY_RESULT;
}
);
}
/**
* Cancels the group size counter for the classic groups.
* Cancels the group size counter for the classic/consumer groups.
*/
private void cancelClassicGroupSizeCounter() {
timer.cancel(CLASSIC_GROUP_SIZE_COUNTER_KEY);
private void cancelGroupSizeCounter() {
timer.cancel(GROUP_SIZE_COUNTER_KEY);
}
/**
@ -736,7 +736,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
groupMetadataManager.onLoaded();
scheduleGroupMetadataExpiration();
scheduleClassicGroupSizeCounter();
scheduleGroupSizeCounter();
}
@Override
@ -744,7 +744,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
timer.cancel(GROUP_EXPIRATION_KEY);
coordinatorMetrics.deactivateMetricsShard(metricsShard);
groupMetadataManager.onUnloaded();
cancelClassicGroupSizeCounter();
cancelGroupSizeCounter();
}
/**

View File

@ -188,7 +188,6 @@ import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABL
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.modern.ModernGroupMember.hasAssignedPartitionsChanged;
import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged;
/**
* The GroupMetadataManager manages the metadata of all classic and consumer groups. It holds
@ -745,7 +744,6 @@ public class GroupMetadataManager {
if (group == null) {
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
groups.put(groupId, consumerGroup);
metrics.onConsumerGroupStateTransition(null, consumerGroup.state());
return consumerGroup;
} else if (group.type() == CONSUMER) {
return (ConsumerGroup) group;
@ -756,7 +754,6 @@ public class GroupMetadataManager {
// replaying consumer group records after offset commit records would not work.
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
groups.put(groupId, consumerGroup);
metrics.onConsumerGroupStateTransition(null, consumerGroup.state());
return consumerGroup;
} else {
throw new IllegalStateException(String.format("Group %s is not a consumer group", groupId));
@ -1134,24 +1131,7 @@ public class GroupMetadataManager {
private void removeGroup(
String groupId
) {
Group group = groups.remove(groupId);
if (group != null) {
switch (group.type()) {
case CONSUMER:
ConsumerGroup consumerGroup = (ConsumerGroup) group;
metrics.onConsumerGroupStateTransition(consumerGroup.state(), null);
break;
case CLASSIC:
// The classic group size counter is implemented as scheduled task.
break;
case SHARE:
// Nothing for now, but we may want to add metrics in the future.
break;
default:
log.warn("Removed group {} with an unknown group type {}.", groupId, group.type());
break;
}
}
groups.remove(groupId);
}
/**
@ -4137,16 +4117,25 @@ public class GroupMetadataManager {
}
/**
* Counts and updates the number of classic groups in different states.
* Counts and updates the number of classic and consumer groups in different states.
*/
public void updateClassicGroupSizeCounter() {
Map<ClassicGroupState, Long> groupSizeCounter = new HashMap<>();
public void updateGroupSizeCounter() {
Map<ClassicGroupState, Long> classicGroupSizeCounter = new HashMap<>();
Map<ConsumerGroup.ConsumerGroupState, Long> consumerGroupSizeCounter = new HashMap<>();
groups.forEach((__, group) -> {
if (group.type() == CLASSIC) {
groupSizeCounter.compute(((ClassicGroup) group).currentState(), Utils::incValue);
switch (group.type()) {
case CLASSIC:
classicGroupSizeCounter.compute(((ClassicGroup) group).currentState(), Utils::incValue);
break;
case CONSUMER:
consumerGroupSizeCounter.compute(((ConsumerGroup) group).state(), Utils::incValue);
break;
default:
break;
}
});
metrics.setClassicGroupGauges(groupSizeCounter);
metrics.setClassicGroupGauges(classicGroupSizeCounter);
metrics.setConsumerGroupGauges(consumerGroupSizeCounter);
}
/**

View File

@ -71,7 +71,7 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
/**
* Consumer group size gauge counters keyed by the metric name.
*/
private final Map<ConsumerGroupState, TimelineGaugeCounter> consumerGroupGauges;
private volatile Map<ConsumerGroupState, Long> consumerGroupGauges;
/**
* Share group size gauge counters keyed by the metric name.
@ -108,19 +108,7 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
numClassicGroupsTimelineCounter = new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0));
this.classicGroupGauges = Collections.emptyMap();
this.consumerGroupGauges = Utils.mkMap(
Utils.mkEntry(ConsumerGroupState.EMPTY,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(ConsumerGroupState.ASSIGNING,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(ConsumerGroupState.RECONCILING,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(ConsumerGroupState.STABLE,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(ConsumerGroupState.DEAD,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0)))
);
this.consumerGroupGauges = Collections.emptyMap();
this.shareGroupGauges = Utils.mkMap(
Utils.mkEntry(ShareGroup.ShareGroupState.EMPTY,
@ -145,17 +133,15 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
}
/**
* Increment the number of consumer groups.
* Set the number of consumer groups.
* This method should be the only way to update the map and is called by the scheduled task
* that updates the metrics in {@link org.apache.kafka.coordinator.group.GroupCoordinatorShard}.
* Breaking this will result in inconsistent behavior.
*
* @param state the consumer group state.
* @param consumerGroupGauges The map counting the number of consumer groups in each state.
*/
public void incrementNumConsumerGroups(ConsumerGroupState state) {
TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state);
if (gaugeCounter != null) {
synchronized (gaugeCounter.timelineLong) {
gaugeCounter.timelineLong.increment();
}
}
public void setConsumerGroupGauges(Map<ConsumerGroupState, Long> consumerGroupGauges) {
this.consumerGroupGauges = consumerGroupGauges;
}
/**
@ -167,20 +153,6 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
}
}
/**
* Decrement the number of consumer groups.
*
* @param state the consumer group state.
*/
public void decrementNumConsumerGroups(ConsumerGroupState state) {
TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state);
if (gaugeCounter != null) {
synchronized (gaugeCounter.timelineLong) {
gaugeCounter.timelineLong.decrement();
}
}
}
/**
* @return The number of offsets.
*/
@ -219,9 +191,9 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
* @return The number of consumer groups in `state`.
*/
public long numConsumerGroups(ConsumerGroupState state) {
TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state);
if (gaugeCounter != null) {
return gaugeCounter.atomicLong.get();
Long counter = consumerGroupGauges.get(state);
if (counter != null) {
return counter;
}
return 0L;
}
@ -231,7 +203,7 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
*/
public long numConsumerGroups() {
return consumerGroupGauges.values().stream()
.mapToLong(timelineGaugeCounter -> timelineGaugeCounter.atomicLong.get()).sum();
.mapToLong(Long::longValue).sum();
}
@Override
@ -257,14 +229,6 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
@Override
public void commitUpTo(long offset) {
this.consumerGroupGauges.forEach((__, gaugeCounter) -> {
long value;
synchronized (gaugeCounter.timelineLong) {
value = gaugeCounter.timelineLong.get(offset);
}
gaugeCounter.atomicLong.set(value);
});
synchronized (numClassicGroupsTimelineCounter.timelineLong) {
long value = numClassicGroupsTimelineCounter.timelineLong.get(offset);
numClassicGroupsTimelineCounter.atomicLong.set(value);
@ -286,8 +250,11 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
/**
* Sets the classicGroupGauges.
* This method should be the only way to update the map and is called by the scheduled task
* that updates the metrics in {@link org.apache.kafka.coordinator.group.GroupCoordinatorShard}.
* Breaking this will result in inconsistent behavior.
*
* @param classicGroupGauges The new classicGroupGauges.
* @param classicGroupGauges The map counting the number of classic groups in each state.
*/
public void setClassicGroupGauges(
Map<ClassicGroupState, Long> classicGroupGauges
@ -295,56 +262,6 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
this.classicGroupGauges = classicGroupGauges;
}
/**
* Called when a consumer group's state has changed. Increment/decrement
* the counter accordingly.
*
* @param oldState The previous state. null value means that it's a new group.
* @param newState The next state. null value means that the group has been removed.
*/
public void onConsumerGroupStateTransition(
ConsumerGroupState oldState,
ConsumerGroupState newState
) {
if (newState != null) {
switch (newState) {
case EMPTY:
incrementNumConsumerGroups(ConsumerGroupState.EMPTY);
break;
case ASSIGNING:
incrementNumConsumerGroups(ConsumerGroupState.ASSIGNING);
break;
case RECONCILING:
incrementNumConsumerGroups(ConsumerGroupState.RECONCILING);
break;
case STABLE:
incrementNumConsumerGroups(ConsumerGroupState.STABLE);
break;
case DEAD:
incrementNumConsumerGroups(ConsumerGroupState.DEAD);
}
}
if (oldState != null) {
switch (oldState) {
case EMPTY:
decrementNumConsumerGroups(ConsumerGroupState.EMPTY);
break;
case ASSIGNING:
decrementNumConsumerGroups(ConsumerGroupState.ASSIGNING);
break;
case RECONCILING:
decrementNumConsumerGroups(ConsumerGroupState.RECONCILING);
break;
case STABLE:
decrementNumConsumerGroups(ConsumerGroupState.STABLE);
break;
case DEAD:
decrementNumConsumerGroups(ConsumerGroupState.DEAD);
}
}
}
public void incrementNumShareGroups(ShareGroup.ShareGroupState state) {
TimelineGaugeCounter gaugeCounter = shareGroupGauges.get(state);
if (gaugeCounter != null) {

View File

@ -885,7 +885,6 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
@Override
protected void maybeUpdateGroupState() {
ConsumerGroupState previousState = state.get();
ConsumerGroupState newState = STABLE;
if (members.isEmpty()) {
newState = EMPTY;
@ -901,7 +900,6 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
}
state.set(newState);
metrics.onConsumerGroupStateTransition(previousState, newState);
}
/**

View File

@ -79,9 +79,9 @@ import java.util.List;
import java.util.Set;
import static org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext;
import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.CLASSIC_GROUP_SIZE_COUNTER_KEY;
import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS;
import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_EXPIRATION_KEY;
import static org.apache.kafka.coordinator.group.GroupCoordinatorShard.GROUP_SIZE_COUNTER_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -1024,7 +1024,7 @@ public class GroupCoordinatorShardTest {
}
@Test
public void testScheduleClassicGroupSizeCounter() {
public void testScheduleGroupSizeCounter() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
@ -1046,21 +1046,21 @@ public class GroupCoordinatorShardTest {
);
coordinator.onLoaded(MetadataImage.EMPTY);
// The classic group size counter is scheduled.
// The counter is scheduled.
assertEquals(
DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
);
// Advance the timer to trigger the update.
time.sleep(DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS + 1);
timer.poll();
verify(groupMetadataManager, times(1)).updateClassicGroupSizeCounter();
verify(groupMetadataManager, times(1)).updateGroupSizeCounter();
// The classic group size counter is scheduled.
// The counter is scheduled.
assertEquals(
DEFAULT_GROUP_GAUGES_UPDATE_INTERVAL_MS,
timer.timeout(CLASSIC_GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
timer.timeout(GROUP_SIZE_COUNTER_KEY).deadlineMs - time.milliseconds()
);
}

View File

@ -3589,42 +3589,71 @@ public class GroupMetadataManagerTest {
}
@Test
public void testUpdateClassicGroupSizeCounter() {
String groupId0 = "group-0";
String groupId1 = "group-1";
String groupId2 = "group-2";
String groupId3 = "group-3";
String groupId4 = "group-4";
public void testUpdateGroupSizeCounter() {
List<String> groupIds = new ArrayList<>();
IntStream.range(0, 8).forEach(i -> groupIds.add("group-" + i));
List<String> consumerMemberIds = List.of("consumer-member-id-0", "consumer-member-id-1", "consumer-member-id-2");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroup(new ConsumerGroupBuilder(groupId0, 10))
.withConsumerGroup(new ConsumerGroupBuilder(groupIds.get(0), 10)) // Empty group
.withConsumerGroup(new ConsumerGroupBuilder(groupIds.get(1), 10) // Stable group
.withAssignmentEpoch(10)
.withMember(new ConsumerGroupMember.Builder(consumerMemberIds.get(0))
.setMemberEpoch(10)
.build()))
.withConsumerGroup(new ConsumerGroupBuilder(groupIds.get(2), 10) // Assigning group
.withAssignmentEpoch(9)
.withMember(new ConsumerGroupMember.Builder(consumerMemberIds.get(1))
.setMemberEpoch(9)
.build()))
.withConsumerGroup(new ConsumerGroupBuilder(groupIds.get(3), 10) // Reconciling group
.withAssignmentEpoch(10)
.withMember(new ConsumerGroupMember.Builder(consumerMemberIds.get(2))
.setMemberEpoch(9)
.build()))
.build();
ClassicGroup group1 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId1, true);
ClassicGroup group2 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId2, true);
ClassicGroup group3 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId3, true);
ClassicGroup group4 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId4, true);
ClassicGroup group4 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupIds.get(4), true);
ClassicGroup group5 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupIds.get(5), true);
ClassicGroup group6 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupIds.get(6), true);
ClassicGroup group7 = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupIds.get(7), true);
context.groupMetadataManager.updateClassicGroupSizeCounter();
context.groupMetadataManager.updateGroupSizeCounter();
verify(context.metrics, times(1)).setClassicGroupGauges(eq(Utils.mkMap(
Utils.mkEntry(ClassicGroupState.EMPTY, 4L)
)));
verify(context.metrics, times(1)).setConsumerGroupGauges(eq(Utils.mkMap(
Utils.mkEntry(ConsumerGroup.ConsumerGroupState.EMPTY, 1L),
Utils.mkEntry(ConsumerGroup.ConsumerGroupState.ASSIGNING, 1L),
Utils.mkEntry(ConsumerGroup.ConsumerGroupState.RECONCILING, 1L),
Utils.mkEntry(ConsumerGroup.ConsumerGroupState.STABLE, 1L)
)));
group1.transitionTo(PREPARING_REBALANCE);
group2.transitionTo(PREPARING_REBALANCE);
group2.transitionTo(COMPLETING_REBALANCE);
group3.transitionTo(PREPARING_REBALANCE);
group3.transitionTo(COMPLETING_REBALANCE);
group3.transitionTo(STABLE);
group4.transitionTo(DEAD);
group4.transitionTo(PREPARING_REBALANCE);
group5.transitionTo(PREPARING_REBALANCE);
group5.transitionTo(COMPLETING_REBALANCE);
group6.transitionTo(PREPARING_REBALANCE);
group6.transitionTo(COMPLETING_REBALANCE);
group6.transitionTo(STABLE);
group7.transitionTo(DEAD);
context.groupMetadataManager.updateClassicGroupSizeCounter();
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupIds.get(1), false, Collections.emptyList())
.removeMember(consumerMemberIds.get(0));
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupIds.get(3), false, Collections.emptyList())
.updateMember(new ConsumerGroupMember.Builder(consumerMemberIds.get(2)).setMemberEpoch(10).build());
context.groupMetadataManager.updateGroupSizeCounter();
verify(context.metrics, times(1)).setClassicGroupGauges(eq(Utils.mkMap(
Utils.mkEntry(ClassicGroupState.PREPARING_REBALANCE, 1L),
Utils.mkEntry(ClassicGroupState.COMPLETING_REBALANCE, 1L),
Utils.mkEntry(ClassicGroupState.STABLE, 1L),
Utils.mkEntry(ClassicGroupState.DEAD, 1L)
)));
verify(context.metrics, times(1)).setConsumerGroupGauges(eq(Utils.mkMap(
Utils.mkEntry(ConsumerGroup.ConsumerGroupState.EMPTY, 2L),
Utils.mkEntry(ConsumerGroup.ConsumerGroupState.ASSIGNING, 1L),
Utils.mkEntry(ConsumerGroup.ConsumerGroupState.STABLE, 1L)
)));
}
@Test
@ -9548,75 +9577,6 @@ public class GroupMetadataManagerTest {
verify(context.metrics).record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
}
@Test
public void testOnClassicGroupStateTransitionOnLoading() {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
ClassicGroup group = new ClassicGroup(
new LogContext(),
"group-id",
EMPTY,
context.time
);
// Even if there are more group metadata records loaded than tombstone records, the last replayed record
// (tombstone in this test) is the latest state of the group. Hence, the overall metric count should be 0.
IntStream.range(0, 5).forEach(__ ->
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, Collections.emptyMap()))
);
IntStream.range(0, 4).forEach(__ ->
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord("group-id"))
);
}
@Test
public void testOnConsumerGroupStateTransition() {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
// Replaying a consumer group epoch record should increment metric.
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("group-id", 1));
verify(context.metrics, times(1)).onConsumerGroupStateTransition(null, ConsumerGroup.ConsumerGroupState.EMPTY);
// Replaying a consumer group epoch record for a group that has already been created should not increment metric.
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("group-id", 1));
verify(context.metrics, times(1)).onConsumerGroupStateTransition(null, ConsumerGroup.ConsumerGroupState.EMPTY);
// Creating and replaying tombstones for a group should remove group and decrement metric.
List<CoordinatorRecord> tombstones = new ArrayList<>();
Group group = context.groupMetadataManager.group("group-id");
group.createGroupTombstoneRecords(tombstones);
tombstones.forEach(context::replay);
assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.group("group-id"));
verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null);
// Replaying a tombstone for a group that has already been removed should not decrement metric.
tombstones.forEach(context::replay);
verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null);
}
@Test
public void testOnConsumerGroupStateTransitionOnLoading() {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
// Even if there are more group epoch records loaded than tombstone records, the last replayed record
// (tombstone in this test) is the latest state of the group. Hence, the overall metric count should be 0.
IntStream.range(0, 5).forEach(__ ->
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("group-id", 0))
);
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("group-id"));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("group-id"));
IntStream.range(0, 3).forEach(__ -> {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("group-id"));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("group-id"));
});
verify(context.metrics, times(1)).onConsumerGroupStateTransition(null, ConsumerGroup.ConsumerGroupState.EMPTY);
verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null);
}
@Test
public void testConsumerGroupHeartbeatWithNonEmptyClassicGroup() {
String classicGroupId = "classic-group-id";
@ -11153,8 +11113,6 @@ public class GroupMetadataManagerTest {
result.records()
);
verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null);
// The new classic member 1 has a heartbeat timeout.
ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout = context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)
@ -11340,8 +11298,6 @@ public class GroupMetadataManagerTest {
timeout.result.records()
);
verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null);
// The new classic member 1 has a heartbeat timeout.
ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout = context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)
@ -11545,8 +11501,6 @@ public class GroupMetadataManagerTest {
timeout.result.records()
);
verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.RECONCILING, null);
// The new classic member 1 has a heartbeat timeout.
ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout = context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)
@ -11780,8 +11734,6 @@ public class GroupMetadataManagerTest {
result.records
);
verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null);
// The new classic member 1 has a heartbeat timeout.
ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout = context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)
@ -14298,8 +14250,6 @@ public class GroupMetadataManagerTest {
leaveResult.records()
);
verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null);
// The new classic member 1 has a heartbeat timeout.
ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout = context.timer.timeout(
classicGroupHeartbeatKey(groupId, memberId1)

View File

@ -17,22 +17,15 @@
package org.apache.kafka.coordinator.group.metrics;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.timeline.SnapshotRegistry;
import com.yammer.metrics.core.MetricsRegistry;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.stream.IntStream;
import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertGaugeValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class GroupCoordinatorMetricsShardTest {
@ -47,135 +40,18 @@ public class GroupCoordinatorMetricsShardTest {
GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard(snapshotRegistry, tp);
shard.incrementNumOffsets();
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY);
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING);
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING);
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE);
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD);
snapshotRegistry.idempotentCreateSnapshot(1000);
// The value should not be updated until the offset has been committed.
assertEquals(0, shard.numOffsets());
assertEquals(0, shard.numConsumerGroups());
assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING));
assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING));
assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE));
assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD));
shard.commitUpTo(1000);
assertEquals(1, shard.numOffsets());
assertEquals(5, shard.numConsumerGroups());
assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING));
assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING));
assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE));
assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD));
shard.decrementNumOffsets();
shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY);
shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING);
shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING);
shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE);
shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD);
snapshotRegistry.idempotentCreateSnapshot(2000);
shard.commitUpTo(2000);
assertEquals(0, shard.numOffsets());
assertEquals(0, shard.numConsumerGroups());
assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING));
assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING));
assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE));
assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD));
}
@Test
public void testConsumerGroupStateTransitionMetrics() {
MetricsRegistry registry = new MetricsRegistry();
Metrics metrics = new Metrics();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
TopicPartition tp = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0);
GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics);
GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard(snapshotRegistry, tp);
coordinatorMetrics.activateMetricsShard(shard);
ConsumerGroup group0 = new ConsumerGroup(
snapshotRegistry,
"group-0",
shard
);
ConsumerGroup group1 = new ConsumerGroup(
snapshotRegistry,
"group-1",
shard
);
ConsumerGroup group2 = new ConsumerGroup(
snapshotRegistry,
"group-2",
shard
);
ConsumerGroup group3 = new ConsumerGroup(
snapshotRegistry,
"group-3",
shard
);
IntStream.range(0, 4).forEach(__ -> shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
snapshotRegistry.idempotentCreateSnapshot(1000);
shard.commitUpTo(1000);
assertEquals(4, shard.numConsumerGroups());
assertEquals(4, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
ConsumerGroupMember member0 = group0.getOrMaybeCreateMember("member-id", true);
ConsumerGroupMember member1 = group1.getOrMaybeCreateMember("member-id", true);
ConsumerGroupMember member2 = group2.getOrMaybeCreateMember("member-id", true);
ConsumerGroupMember member3 = group3.getOrMaybeCreateMember("member-id", true);
group0.updateMember(member0);
group1.updateMember(member1);
group2.updateMember(member2);
group3.updateMember(member3);
snapshotRegistry.idempotentCreateSnapshot(2000);
shard.commitUpTo(2000);
assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
assertEquals(4, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE));
group2.setGroupEpoch(1);
group3.setGroupEpoch(1);
snapshotRegistry.idempotentCreateSnapshot(3000);
shard.commitUpTo(3000);
assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
assertEquals(2, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING));
assertEquals(2, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE));
group2.setTargetAssignmentEpoch(1);
// Set member2 to ASSIGNING state.
new ConsumerGroupMember.Builder(member2)
.setPartitionsPendingRevocation(Collections.singletonMap(Uuid.ZERO_UUID, Collections.singleton(0)))
.build();
snapshotRegistry.idempotentCreateSnapshot(4000);
shard.commitUpTo(4000);
assertEquals(0, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING));
assertEquals(1, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.RECONCILING));
assertEquals(2, shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE));
assertGaugeValue(metrics, metrics.metricName("group-count", "group-coordinator-metrics",
Collections.singletonMap("protocol", "consumer")), 4);
assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics",
Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.EMPTY.toString())), 0);
assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics",
Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())), 1);
assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics",
Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.RECONCILING.toString())), 1);
assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics",
Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.STABLE.toString())), 2);
assertGaugeValue(metrics, metrics.metricName("consumer-group-count", "group-coordinator-metrics",
Collections.singletonMap("state", ConsumerGroup.ConsumerGroupState.DEAD.toString())), 0);
}
}

View File

@ -38,6 +38,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.stream.IntStream;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
@ -181,9 +182,11 @@ public class GroupCoordinatorMetricsTest {
Utils.mkEntry(ClassicGroupState.DEAD, 1L)
));
IntStream.range(0, 5).forEach(__ -> shard0.incrementNumConsumerGroups(ConsumerGroupState.ASSIGNING));
IntStream.range(0, 5).forEach(__ -> shard1.incrementNumConsumerGroups(ConsumerGroupState.RECONCILING));
IntStream.range(0, 3).forEach(__ -> shard1.decrementNumConsumerGroups(ConsumerGroupState.DEAD));
shard0.setConsumerGroupGauges(Collections.singletonMap(ConsumerGroupState.ASSIGNING, 5L));
shard1.setConsumerGroupGauges(Map.of(
ConsumerGroupState.RECONCILING, 1L,
ConsumerGroupState.DEAD, 1L
));
IntStream.range(0, 6).forEach(__ -> shard0.incrementNumOffsets());
IntStream.range(0, 2).forEach(__ -> shard1.incrementNumOffsets());

View File

@ -81,8 +81,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class ConsumerGroupTest {
@ -426,7 +424,6 @@ public class ConsumerGroupTest {
@Test
public void testGroupState() {
Uuid fooTopicId = Uuid.randomUuid();
ConsumerGroup consumerGroup = createConsumerGroup("foo");
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY, consumerGroup.state());
@ -1295,46 +1292,6 @@ public class ConsumerGroupTest {
assertEquals(expected, actual);
}
@Test
public void testStateTransitionMetrics() {
// Confirm metrics is not updated when a new ConsumerGroup is created but only when the group transitions
// its state.
GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class);
ConsumerGroup consumerGroup = new ConsumerGroup(
new SnapshotRegistry(new LogContext()),
"group-id",
metrics
);
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY, consumerGroup.state());
verify(metrics, times(0)).onConsumerGroupStateTransition(null, ConsumerGroup.ConsumerGroupState.EMPTY);
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
.build();
consumerGroup.updateMember(member);
assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, consumerGroup.state());
verify(metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, ConsumerGroup.ConsumerGroupState.RECONCILING);
consumerGroup.setGroupEpoch(1);
assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, consumerGroup.state());
verify(metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.RECONCILING, ConsumerGroup.ConsumerGroupState.ASSIGNING);
consumerGroup.setTargetAssignmentEpoch(1);
assertEquals(ConsumerGroup.ConsumerGroupState.STABLE, consumerGroup.state());
verify(metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.ASSIGNING, ConsumerGroup.ConsumerGroupState.STABLE);
consumerGroup.removeMember("member");
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY, consumerGroup.state());
verify(metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, ConsumerGroup.ConsumerGroupState.EMPTY);
}
@Test
public void testIsInStatesCaseInsensitive() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());