MINOR: Remove metrics attribute from StreamsGroup (#20559)

The `metrics` attribute in `StreamsGroup` is not used anymore. This
patch removes it.

Reviewers: Ken Huang <s7133700@gmail.com>, Lucas Brutschy
 <lbrutschy@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Lan Ding 2025-09-19 16:32:41 +08:00 committed by GitHub
parent d067c6c040
commit cfa0b416ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 14 additions and 36 deletions

View File

@ -851,10 +851,10 @@ public class GroupMetadataManager {
Group group = groups.get(groupId);
if (group == null) {
return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics);
return new StreamsGroup(logContext, snapshotRegistry, groupId);
} else if (maybeDeleteEmptyClassicGroup(group, records)) {
log.info("[GroupId {}] Converted the empty classic group to a streams group.", groupId);
return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics);
return new StreamsGroup(logContext, snapshotRegistry, groupId);
} else {
return castToStreamsGroup(group);
}
@ -1023,7 +1023,7 @@ public class GroupMetadataManager {
}
if (group == null) {
StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, groupId, metrics);
StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, groupId);
groups.put(groupId, streamsGroup);
return streamsGroup;
} else if (group.type() == STREAMS) {

View File

@ -31,7 +31,6 @@ import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.Utils;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import org.apache.kafka.timeline.SnapshotRegistry;
@ -179,11 +178,6 @@ public class StreamsGroup implements Group {
private final TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>> currentStandbyTaskToProcessIds;
private final TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>> currentWarmupTaskToProcessIds;
/**
* The coordinator metrics.
*/
private final GroupCoordinatorMetricsShard metrics;
/**
* The Streams topology.
*/
@ -220,8 +214,7 @@ public class StreamsGroup implements Group {
public StreamsGroup(
LogContext logContext,
SnapshotRegistry snapshotRegistry,
String groupId,
GroupCoordinatorMetricsShard metrics
String groupId
) {
this.log = logContext.logger(StreamsGroup.class);
this.logContext = logContext;
@ -238,7 +231,6 @@ public class StreamsGroup implements Group {
this.currentActiveTaskToProcessId = new TimelineHashMap<>(snapshotRegistry, 0);
this.currentStandbyTaskToProcessIds = new TimelineHashMap<>(snapshotRegistry, 0);
this.currentWarmupTaskToProcessIds = new TimelineHashMap<>(snapshotRegistry, 0);
this.metrics = Objects.requireNonNull(metrics);
this.topology = new TimelineObject<>(snapshotRegistry, Optional.empty());
this.configuredTopology = new TimelineObject<>(snapshotRegistry, Optional.empty());
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
@ -43,7 +42,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignment
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
@ -90,8 +88,7 @@ public class StreamsGroupTest {
return new StreamsGroup(
LOG_CONTEXT,
snapshotRegistry,
groupId,
mock(GroupCoordinatorMetricsShard.class)
groupId
);
}
@ -693,8 +690,7 @@ public class StreamsGroupTest {
StreamsGroup group = new StreamsGroup(
LOG_CONTEXT,
snapshotRegistry,
"group-foo",
mock(GroupCoordinatorMetricsShard.class)
"group-foo"
);
group.setGroupEpoch(1);
group.setTopology(new StreamsTopology(1, Map.of()));
@ -719,8 +715,7 @@ public class StreamsGroupTest {
StreamsGroup group = new StreamsGroup(
LOG_CONTEXT,
snapshotRegistry,
"group-foo",
mock(GroupCoordinatorMetricsShard.class)
"group-foo"
);
// Simulate a call from the admin client without member ID and member epoch.
@ -790,7 +785,7 @@ public class StreamsGroupTest {
long commitTimestamp = 20000L;
long offsetsRetentionMs = 10000L;
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID);
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, new SnapshotRegistry(LOG_CONTEXT), "group-id", mock(GroupCoordinatorMetricsShard.class));
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, new SnapshotRegistry(LOG_CONTEXT), "group-id");
Optional<OffsetExpirationCondition> offsetExpirationCondition = group.offsetExpirationCondition();
assertTrue(offsetExpirationCondition.isPresent());
@ -803,7 +798,7 @@ public class StreamsGroupTest {
@Test
public void testAsDescribedGroup() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-1", mock(GroupCoordinatorMetricsShard.class));
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-1");
snapshotRegistry.idempotentCreateSnapshot(0);
assertEquals(StreamsGroup.StreamsGroupState.EMPTY.toString(), group.stateAsString(0));
@ -887,12 +882,7 @@ public class StreamsGroupTest {
@Test
public void testIsInStatesCaseInsensitiveAndUnderscored() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard(
snapshotRegistry,
Map.of(),
new TopicPartition("__consumer_offsets", 0)
);
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-foo", metricsShard);
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-foo");
snapshotRegistry.idempotentCreateSnapshot(0);
assertTrue(group.isInStates(Set.of("empty"), 0));
assertFalse(group.isInStates(Set.of("Empty"), 0));
@ -911,8 +901,7 @@ public class StreamsGroupTest {
StreamsGroup streamsGroup = new StreamsGroup(
LOG_CONTEXT,
snapshotRegistry,
"group-foo",
mock(GroupCoordinatorMetricsShard.class)
"group-foo"
);
MetadataImage metadataImage = new MetadataImageBuilder()
@ -933,8 +922,7 @@ public class StreamsGroupTest {
StreamsGroup streamsGroup = new StreamsGroup(
LOG_CONTEXT,
snapshotRegistry,
"test-group",
mock(GroupCoordinatorMetricsShard.class)
"test-group"
);
streamsGroup.updateMember(new StreamsGroupMember.Builder("member1")
.setMemberEpoch(1)
@ -961,8 +949,7 @@ public class StreamsGroupTest {
public void testIsSubscribedToTopic() {
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, "test-group", metricsShard);
StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, "test-group");
assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
@ -1008,8 +995,7 @@ public class StreamsGroupTest {
String memberId2 = "test-member-id2";
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, "test-group", metricsShard);
StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, "test-group");
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId1));
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId2));