MINOR: Invoke share group rebalance sensor. (#20006)

* The share group rebalance metric was not being invoked at the
appropriate group id bump position.
* This PR solves the issue.
* The metric name has been updated
(s/rebalance-rate/share-group-rebalance-rate,
s/rebalance-count/share-group-rebalance-count/)
* Updated tests in `GroupMetadataManagerTest` and
`GroupCoordinatorMetricsTest`

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-06-21 13:05:19 +05:30 committed by GitHub
parent d442c31e92
commit 815dd93e2f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 17 additions and 20 deletions

View File

@ -248,6 +248,7 @@ import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPA
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged; import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord; import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord;
@ -2610,6 +2611,7 @@ public class GroupMetadataManager {
groupEpoch += 1; groupEpoch += 1;
records.add(newShareGroupEpochRecord(groupId, groupEpoch, groupMetadataHash)); records.add(newShareGroupEpochRecord(groupId, groupEpoch, groupMetadataHash));
log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", groupId, groupEpoch, groupMetadataHash); log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", groupId, groupEpoch, groupMetadataHash);
metrics.record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
} }
group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch); group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch);

View File

@ -297,14 +297,12 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
Sensor shareGroupRebalanceSensor = metrics.sensor(SHARE_GROUP_REBALANCES_SENSOR_NAME); Sensor shareGroupRebalanceSensor = metrics.sensor(SHARE_GROUP_REBALANCES_SENSOR_NAME);
shareGroupRebalanceSensor.add(new Meter( shareGroupRebalanceSensor.add(new Meter(
metrics.metricName("rebalance-rate", metrics.metricName("share-group-rebalance-rate",
METRICS_GROUP, METRICS_GROUP,
"The rate of share group rebalances", "The rate of share group rebalances"),
SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString()), metrics.metricName("share-group-rebalance-count",
metrics.metricName("rebalance-count",
METRICS_GROUP, METRICS_GROUP,
"The total number of share group rebalances", "The total number of share group rebalances")));
SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString())));
Sensor streamsGroupRebalanceSensor = metrics.sensor(STREAMS_GROUP_REBALANCES_SENSOR_NAME); Sensor streamsGroupRebalanceSensor = metrics.sensor(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
streamsGroupRebalanceSensor.add(new Meter( streamsGroupRebalanceSensor.add(new Meter(

View File

@ -203,6 +203,7 @@ import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPA
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals;
@ -19699,6 +19700,7 @@ public class GroupMetadataManagerTest {
.setMemberId(Uuid.randomUuid().toString()) .setMemberId(Uuid.randomUuid().toString())
.setMemberEpoch(1) .setMemberEpoch(1)
.setSubscribedTopicNames(List.of("foo", "bar")))); .setSubscribedTopicNames(List.of("foo", "bar"))));
verify(context.metrics, times(0)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
} }
@Test @Test
@ -19785,6 +19787,7 @@ public class GroupMetadataManagerTest {
.setGroupId(groupId) .setGroupId(groupId)
.setMemberEpoch(0) .setMemberEpoch(0)
.setSubscribedTopicNames(List.of("foo", "bar")))); .setSubscribedTopicNames(List.of("foo", "bar"))));
verify(context.metrics, times(0)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
} }
@Test @Test
@ -22306,6 +22309,7 @@ public class GroupMetadataManagerTest {
); );
assertEquals(Map.of(t1Uuid, Set.of(0, 1), t2Uuid, Set.of(0, 1)), context.groupMetadataManager.initializedShareGroupPartitions(groupId)); assertEquals(Map.of(t1Uuid, Set.of(0, 1), t2Uuid, Set.of(0, 1)), context.groupMetadataManager.initializedShareGroupPartitions(groupId));
verify(context.metrics, times(2)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
} }
@Test @Test
@ -22415,6 +22419,7 @@ public class GroupMetadataManagerTest {
2, 2,
true true
); );
verify(context.metrics, times(2)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
} }
@Test @Test

View File

@ -104,14 +104,8 @@ public class GroupCoordinatorMetricsTest {
"group-count", "group-count",
GroupCoordinatorMetrics.METRICS_GROUP, GroupCoordinatorMetrics.METRICS_GROUP,
Map.of("protocol", Group.GroupType.SHARE.toString())), Map.of("protocol", Group.GroupType.SHARE.toString())),
metrics.metricName( metrics.metricName("share-group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP),
"rebalance-rate", metrics.metricName("share-group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP),
GroupCoordinatorMetrics.METRICS_GROUP,
Map.of("protocol", Group.GroupType.SHARE.toString())),
metrics.metricName(
"rebalance-count",
GroupCoordinatorMetrics.METRICS_GROUP,
Map.of("protocol", Group.GroupType.SHARE.toString())),
metrics.metricName( metrics.metricName(
"share-group-count", "share-group-count",
GroupCoordinatorMetrics.METRICS_GROUP, GroupCoordinatorMetrics.METRICS_GROUP,
@ -304,16 +298,14 @@ public class GroupCoordinatorMetricsTest {
shard.record(SHARE_GROUP_REBALANCES_SENSOR_NAME, 50); shard.record(SHARE_GROUP_REBALANCES_SENSOR_NAME, 50);
assertMetricValue(metrics, metrics.metricName( assertMetricValue(metrics, metrics.metricName(
"rebalance-rate", "share-group-rebalance-rate",
GroupCoordinatorMetrics.METRICS_GROUP, GroupCoordinatorMetrics.METRICS_GROUP,
"The rate of share group rebalances", "The rate of share group rebalances"
"protocol", "share"
), 5.0 / 3.0); ), 5.0 / 3.0);
assertMetricValue(metrics, metrics.metricName( assertMetricValue(metrics, metrics.metricName(
"rebalance-count", "share-group-rebalance-count",
GroupCoordinatorMetrics.METRICS_GROUP, GroupCoordinatorMetrics.METRICS_GROUP,
"The total number of share group rebalances", "The total number of share group rebalances"
"protocol", "share"
), 50); ), 50);
shard.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME, 50); shard.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME, 50);