diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index f87af4897a7..477e0499565 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -813,10 +813,10 @@ public class GroupMetadataManager { } if (group == null) { - return new ConsumerGroup(snapshotRegistry, groupId, metrics); + return new ConsumerGroup(snapshotRegistry, groupId); } else if (createIfNotExists && maybeDeleteEmptyClassicGroup(group, records)) { log.info("[GroupId {}] Converted the empty classic group to a consumer group.", groupId); - return new ConsumerGroup(snapshotRegistry, groupId, metrics); + return new ConsumerGroup(snapshotRegistry, groupId); } else { if (group.type() == CONSUMER) { return (ConsumerGroup) group; @@ -980,7 +980,7 @@ public class GroupMetadataManager { } if (group == null) { - ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId); groups.put(groupId, consumerGroup); return consumerGroup; } else if (group.type() == CONSUMER) { @@ -990,7 +990,7 @@ public class GroupMetadataManager { // offsets if no group existed. Simple classic groups are not backed by any records // in the __consumer_offsets topic hence we can safely replace it here. Without this, // replaying consumer group records after offset commit records would not work. - ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId); groups.put(groupId, consumerGroup); return consumerGroup; } else { @@ -1364,7 +1364,6 @@ public class GroupMetadataManager { try { consumerGroup = ConsumerGroup.fromClassicGroup( snapshotRegistry, - metrics, classicGroup, topicHashCache, metadataImage diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java index 19b103c71a7..2db94cb0535 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java @@ -38,7 +38,6 @@ import org.apache.kafka.coordinator.group.Utils; import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; import org.apache.kafka.coordinator.group.classic.ClassicGroup; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; -import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.MemberState; import org.apache.kafka.coordinator.group.modern.ModernGroup; @@ -119,11 +118,6 @@ public class ConsumerGroup extends ModernGroup { */ private final TimelineHashMap serverAssignors; - /** - * The coordinator metrics. - */ - private final GroupCoordinatorMetricsShard metrics; - /** * The number of members that use the classic protocol. */ @@ -155,14 +149,12 @@ public class ConsumerGroup extends ModernGroup { public ConsumerGroup( SnapshotRegistry snapshotRegistry, - String groupId, - GroupCoordinatorMetricsShard metrics + String groupId ) { super(snapshotRegistry, groupId); this.state = new TimelineObject<>(snapshotRegistry, EMPTY); this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0); this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0); - this.metrics = Objects.requireNonNull(metrics); this.numClassicProtocolMembers = new TimelineInteger(snapshotRegistry); this.classicProtocolMembersSupportedProtocols = new TimelineHashMap<>(snapshotRegistry, 0); this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0); @@ -1130,7 +1122,6 @@ public class ConsumerGroup extends ModernGroup { * Create a new consumer group according to the given classic group. * * @param snapshotRegistry The SnapshotRegistry. - * @param metrics The GroupCoordinatorMetricsShard. * @param classicGroup The converted classic group. * @param topicHashCache The cache for topic hashes. * @param metadataImage The current metadata image for the Kafka cluster. @@ -1141,13 +1132,12 @@ public class ConsumerGroup extends ModernGroup { */ public static ConsumerGroup fromClassicGroup( SnapshotRegistry snapshotRegistry, - GroupCoordinatorMetricsShard metrics, ClassicGroup classicGroup, Map topicHashCache, CoordinatorMetadataImage metadataImage ) { String groupId = classicGroup.groupId(); - ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId); consumerGroup.setGroupEpoch(classicGroup.generationId()); consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId()); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index 9a7d3820126..145b7613f1f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -84,7 +84,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignment import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; -import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; import org.apache.kafka.coordinator.group.modern.share.ShareGroup; import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult; @@ -1384,10 +1383,9 @@ public class GroupCoordinatorShardTest { ArgumentCaptor> recordsCapture = ArgumentCaptor.forClass(List.class); SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class); - ConsumerGroup group1 = new ConsumerGroup(snapshotRegistry, "group-id", metricsShard); - ConsumerGroup group2 = new ConsumerGroup(snapshotRegistry, "other-group-id", metricsShard); + ConsumerGroup group1 = new ConsumerGroup(snapshotRegistry, "group-id"); + ConsumerGroup group2 = new ConsumerGroup(snapshotRegistry, "other-group-id"); when(groupMetadataManager.groupIds()).thenReturn(Set.of("group-id", "other-group-id")); when(groupMetadataManager.group("group-id")).thenReturn(group1); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java index 2e6cf0012f7..24212d10982 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java @@ -45,7 +45,6 @@ import org.apache.kafka.coordinator.group.OffsetAndMetadata; import org.apache.kafka.coordinator.group.OffsetExpirationCondition; import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; -import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.MemberState; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; @@ -81,7 +80,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; public class ClassicGroupTest { private final String protocolType = "consumer"; @@ -1383,8 +1381,7 @@ public class ClassicGroupTest { ConsumerGroup consumerGroup = new ConsumerGroup( new SnapshotRegistry(logContext), - groupId, - mock(GroupCoordinatorMetricsShard.class) + groupId ); consumerGroup.setGroupEpoch(10); consumerGroup.setTargetAssignmentEpoch(10); @@ -1536,8 +1533,7 @@ public class ClassicGroupTest { ConsumerGroup consumerGroup = new ConsumerGroup( new SnapshotRegistry(logContext), - groupId, - mock(GroupCoordinatorMetricsShard.class) + groupId ); consumerGroup.setGroupEpoch(10); consumerGroup.setTargetAssignmentEpoch(10); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java index 202b91a5e41..5956e51b161 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.errors.IllegalGenerationException; import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.protocol.ApiKeys; @@ -45,7 +44,6 @@ import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; import org.apache.kafka.coordinator.group.classic.ClassicGroup; import org.apache.kafka.coordinator.group.classic.ClassicGroupMember; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; -import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.MemberState; import org.apache.kafka.coordinator.group.modern.ModernGroup; @@ -83,7 +81,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; public class ConsumerGroupTest { @@ -91,8 +88,7 @@ public class ConsumerGroupTest { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); return new ConsumerGroup( snapshotRegistry, - groupId, - mock(GroupCoordinatorMetricsShard.class) + groupId ); } @@ -700,8 +696,7 @@ public class ConsumerGroupTest { @Test public void testUpdateInvertedAssignment() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class); - ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, "test-group", metricsShard); + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, "test-group"); Uuid topicId = Uuid.randomUuid(); String memberId1 = "member1"; String memberId2 = "member2"; @@ -916,12 +911,7 @@ public class ConsumerGroupTest { @Test public void testAsListedGroup() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard( - snapshotRegistry, - Map.of(), - new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) - ); - ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", metricsShard); + ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo"); snapshotRegistry.idempotentCreateSnapshot(0); assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0)); group.updateMember(new ConsumerGroupMember.Builder("member1") @@ -937,8 +927,7 @@ public class ConsumerGroupTest { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); ConsumerGroup group = new ConsumerGroup( snapshotRegistry, - "group-foo", - mock(GroupCoordinatorMetricsShard.class) + "group-foo" ); // Simulate a call from the admin client without member id and member epoch. @@ -997,7 +986,7 @@ public class ConsumerGroupTest { long commitTimestamp = 20000L; long offsetsRetentionMs = 10000L; OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID); - ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new LogContext()), "group-id", mock(GroupCoordinatorMetricsShard.class)); + ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new LogContext()), "group-id"); Optional offsetExpirationCondition = group.offsetExpirationCondition(); assertTrue(offsetExpirationCondition.isPresent()); @@ -1034,7 +1023,7 @@ public class ConsumerGroupTest { @Test public void testAsDescribedGroup() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-id-1", mock(GroupCoordinatorMetricsShard.class)); + ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-id-1"); snapshotRegistry.idempotentCreateSnapshot(0); assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0)); @@ -1071,12 +1060,7 @@ public class ConsumerGroupTest { @Test public void testIsInStatesCaseInsensitive() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard( - snapshotRegistry, - Map.of(), - new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) - ); - ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", metricsShard); + ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo"); snapshotRegistry.idempotentCreateSnapshot(0); assertTrue(group.isInStates(Set.of("empty"), 0)); assertFalse(group.isInStates(Set.of("Empty"), 0)); @@ -1307,7 +1291,6 @@ public class ConsumerGroupTest { ConsumerGroup consumerGroup = ConsumerGroup.fromClassicGroup( new SnapshotRegistry(logContext), - mock(GroupCoordinatorMetricsShard.class), classicGroup, new HashMap<>(), metadataImage @@ -1315,8 +1298,7 @@ public class ConsumerGroupTest { ConsumerGroup expectedConsumerGroup = new ConsumerGroup( new SnapshotRegistry(logContext), - groupId, - mock(GroupCoordinatorMetricsShard.class) + groupId ); expectedConsumerGroup.setGroupEpoch(10); expectedConsumerGroup.setTargetAssignmentEpoch(10);