From cfa0b416efbd354f5209492c121d47daa8436e53 Mon Sep 17 00:00:00 2001 From: Lan Ding Date: Fri, 19 Sep 2025 16:32:41 +0800 Subject: [PATCH] MINOR: Remove metrics attribute from StreamsGroup (#20559) The `metrics` attribute in `StreamsGroup` is not used anymore. This patch removes it. Reviewers: Ken Huang , Lucas Brutschy , Chia-Ping Tsai --- .../group/GroupMetadataManager.java | 6 ++-- .../group/streams/StreamsGroup.java | 10 +----- .../group/streams/StreamsGroupTest.java | 34 ++++++------------- 3 files changed, 14 insertions(+), 36 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 477e0499565..4f156d75daa 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -851,10 +851,10 @@ public class GroupMetadataManager { Group group = groups.get(groupId); if (group == null) { - return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics); + return new StreamsGroup(logContext, snapshotRegistry, groupId); } else if (maybeDeleteEmptyClassicGroup(group, records)) { log.info("[GroupId {}] Converted the empty classic group to a streams group.", groupId); - return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics); + return new StreamsGroup(logContext, snapshotRegistry, groupId); } else { return castToStreamsGroup(group); } @@ -1023,7 +1023,7 @@ public class GroupMetadataManager { } if (group == null) { - StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, groupId, metrics); + StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, groupId); groups.put(groupId, streamsGroup); return streamsGroup; } else if (group.type() == STREAMS) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index b75f3926d08..72d4386321e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -31,7 +31,6 @@ import org.apache.kafka.coordinator.group.Group; import org.apache.kafka.coordinator.group.OffsetExpirationCondition; import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; import org.apache.kafka.coordinator.group.Utils; -import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology; import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology; import org.apache.kafka.timeline.SnapshotRegistry; @@ -179,11 +178,6 @@ public class StreamsGroup implements Group { private final TimelineHashMap>> currentStandbyTaskToProcessIds; private final TimelineHashMap>> currentWarmupTaskToProcessIds; - /** - * The coordinator metrics. - */ - private final GroupCoordinatorMetricsShard metrics; - /** * The Streams topology. */ @@ -220,8 +214,7 @@ public class StreamsGroup implements Group { public StreamsGroup( LogContext logContext, SnapshotRegistry snapshotRegistry, - String groupId, - GroupCoordinatorMetricsShard metrics + String groupId ) { this.log = logContext.logger(StreamsGroup.class); this.logContext = logContext; @@ -238,7 +231,6 @@ public class StreamsGroup implements Group { this.currentActiveTaskToProcessId = new TimelineHashMap<>(snapshotRegistry, 0); this.currentStandbyTaskToProcessIds = new TimelineHashMap<>(snapshotRegistry, 0); this.currentWarmupTaskToProcessIds = new TimelineHashMap<>(snapshotRegistry, 0); - this.metrics = Objects.requireNonNull(metrics); this.topology = new TimelineObject<>(snapshotRegistry, Optional.empty()); this.configuredTopology = new TimelineObject<>(snapshotRegistry, Optional.empty()); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java index a76e282be6e..71feb2a1e90 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.coordinator.group.streams; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.StaleMemberEpochException; @@ -43,7 +42,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignment import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; -import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState; import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole; import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology; @@ -90,8 +88,7 @@ public class StreamsGroupTest { return new StreamsGroup( LOG_CONTEXT, snapshotRegistry, - groupId, - mock(GroupCoordinatorMetricsShard.class) + groupId ); } @@ -693,8 +690,7 @@ public class StreamsGroupTest { StreamsGroup group = new StreamsGroup( LOG_CONTEXT, snapshotRegistry, - "group-foo", - mock(GroupCoordinatorMetricsShard.class) + "group-foo" ); group.setGroupEpoch(1); group.setTopology(new StreamsTopology(1, Map.of())); @@ -719,8 +715,7 @@ public class StreamsGroupTest { StreamsGroup group = new StreamsGroup( LOG_CONTEXT, snapshotRegistry, - "group-foo", - mock(GroupCoordinatorMetricsShard.class) + "group-foo" ); // Simulate a call from the admin client without member ID and member epoch. @@ -790,7 +785,7 @@ public class StreamsGroupTest { long commitTimestamp = 20000L; long offsetsRetentionMs = 10000L; OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID); - StreamsGroup group = new StreamsGroup(LOG_CONTEXT, new SnapshotRegistry(LOG_CONTEXT), "group-id", mock(GroupCoordinatorMetricsShard.class)); + StreamsGroup group = new StreamsGroup(LOG_CONTEXT, new SnapshotRegistry(LOG_CONTEXT), "group-id"); Optional offsetExpirationCondition = group.offsetExpirationCondition(); assertTrue(offsetExpirationCondition.isPresent()); @@ -803,7 +798,7 @@ public class StreamsGroupTest { @Test public void testAsDescribedGroup() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-1", mock(GroupCoordinatorMetricsShard.class)); + StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-1"); snapshotRegistry.idempotentCreateSnapshot(0); assertEquals(StreamsGroup.StreamsGroupState.EMPTY.toString(), group.stateAsString(0)); @@ -887,12 +882,7 @@ public class StreamsGroupTest { @Test public void testIsInStatesCaseInsensitiveAndUnderscored() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT); - GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard( - snapshotRegistry, - Map.of(), - new TopicPartition("__consumer_offsets", 0) - ); - StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-foo", metricsShard); + StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-foo"); snapshotRegistry.idempotentCreateSnapshot(0); assertTrue(group.isInStates(Set.of("empty"), 0)); assertFalse(group.isInStates(Set.of("Empty"), 0)); @@ -911,8 +901,7 @@ public class StreamsGroupTest { StreamsGroup streamsGroup = new StreamsGroup( LOG_CONTEXT, snapshotRegistry, - "group-foo", - mock(GroupCoordinatorMetricsShard.class) + "group-foo" ); MetadataImage metadataImage = new MetadataImageBuilder() @@ -933,8 +922,7 @@ public class StreamsGroupTest { StreamsGroup streamsGroup = new StreamsGroup( LOG_CONTEXT, snapshotRegistry, - "test-group", - mock(GroupCoordinatorMetricsShard.class) + "test-group" ); streamsGroup.updateMember(new StreamsGroupMember.Builder("member1") .setMemberEpoch(1) @@ -961,8 +949,7 @@ public class StreamsGroupTest { public void testIsSubscribedToTopic() { LogContext logContext = new LogContext(); SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); - GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class); - StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, "test-group", metricsShard); + StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, "test-group"); assertFalse(streamsGroup.isSubscribedToTopic("test-topic1")); assertFalse(streamsGroup.isSubscribedToTopic("test-topic2")); @@ -1008,8 +995,7 @@ public class StreamsGroupTest { String memberId2 = "test-member-id2"; LogContext logContext = new LogContext(); SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); - GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class); - StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, "test-group", metricsShard); + StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, "test-group"); streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId1)); streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId2));