mirror of https://github.com/apache/kafka.git
MINOR: Remove metrics attribute from ConsumerGroup (#20542)
The `metrics` attribute in `ConsumerGroup` is not used anymore. This patch removes it. Reviewers: Lianet Magrans <lmagrans@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Dongnuo Lyu <dlyu@confluent.io>
This commit is contained in:
parent
d6fdbfcf15
commit
8c8e93c4a1
|
@ -813,10 +813,10 @@ public class GroupMetadataManager {
|
|||
}
|
||||
|
||||
if (group == null) {
|
||||
return new ConsumerGroup(snapshotRegistry, groupId, metrics);
|
||||
return new ConsumerGroup(snapshotRegistry, groupId);
|
||||
} else if (createIfNotExists && maybeDeleteEmptyClassicGroup(group, records)) {
|
||||
log.info("[GroupId {}] Converted the empty classic group to a consumer group.", groupId);
|
||||
return new ConsumerGroup(snapshotRegistry, groupId, metrics);
|
||||
return new ConsumerGroup(snapshotRegistry, groupId);
|
||||
} else {
|
||||
if (group.type() == CONSUMER) {
|
||||
return (ConsumerGroup) group;
|
||||
|
@ -980,7 +980,7 @@ public class GroupMetadataManager {
|
|||
}
|
||||
|
||||
if (group == null) {
|
||||
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
|
||||
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId);
|
||||
groups.put(groupId, consumerGroup);
|
||||
return consumerGroup;
|
||||
} else if (group.type() == CONSUMER) {
|
||||
|
@ -990,7 +990,7 @@ public class GroupMetadataManager {
|
|||
// offsets if no group existed. Simple classic groups are not backed by any records
|
||||
// in the __consumer_offsets topic hence we can safely replace it here. Without this,
|
||||
// replaying consumer group records after offset commit records would not work.
|
||||
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
|
||||
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId);
|
||||
groups.put(groupId, consumerGroup);
|
||||
return consumerGroup;
|
||||
} else {
|
||||
|
@ -1364,7 +1364,6 @@ public class GroupMetadataManager {
|
|||
try {
|
||||
consumerGroup = ConsumerGroup.fromClassicGroup(
|
||||
snapshotRegistry,
|
||||
metrics,
|
||||
classicGroup,
|
||||
topicHashCache,
|
||||
metadataImage
|
||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.kafka.coordinator.group.Utils;
|
|||
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
|
||||
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
|
||||
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
|
||||
import org.apache.kafka.coordinator.group.modern.Assignment;
|
||||
import org.apache.kafka.coordinator.group.modern.MemberState;
|
||||
import org.apache.kafka.coordinator.group.modern.ModernGroup;
|
||||
|
@ -119,11 +118,6 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
|
|||
*/
|
||||
private final TimelineHashMap<String, Integer> serverAssignors;
|
||||
|
||||
/**
|
||||
* The coordinator metrics.
|
||||
*/
|
||||
private final GroupCoordinatorMetricsShard metrics;
|
||||
|
||||
/**
|
||||
* The number of members that use the classic protocol.
|
||||
*/
|
||||
|
@ -155,14 +149,12 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
|
|||
|
||||
public ConsumerGroup(
|
||||
SnapshotRegistry snapshotRegistry,
|
||||
String groupId,
|
||||
GroupCoordinatorMetricsShard metrics
|
||||
String groupId
|
||||
) {
|
||||
super(snapshotRegistry, groupId);
|
||||
this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
|
||||
this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
|
||||
this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
|
||||
this.metrics = Objects.requireNonNull(metrics);
|
||||
this.numClassicProtocolMembers = new TimelineInteger(snapshotRegistry);
|
||||
this.classicProtocolMembersSupportedProtocols = new TimelineHashMap<>(snapshotRegistry, 0);
|
||||
this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0);
|
||||
|
@ -1130,7 +1122,6 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
|
|||
* Create a new consumer group according to the given classic group.
|
||||
*
|
||||
* @param snapshotRegistry The SnapshotRegistry.
|
||||
* @param metrics The GroupCoordinatorMetricsShard.
|
||||
* @param classicGroup The converted classic group.
|
||||
* @param topicHashCache The cache for topic hashes.
|
||||
* @param metadataImage The current metadata image for the Kafka cluster.
|
||||
|
@ -1141,13 +1132,12 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
|
|||
*/
|
||||
public static ConsumerGroup fromClassicGroup(
|
||||
SnapshotRegistry snapshotRegistry,
|
||||
GroupCoordinatorMetricsShard metrics,
|
||||
ClassicGroup classicGroup,
|
||||
Map<String, Long> topicHashCache,
|
||||
CoordinatorMetadataImage metadataImage
|
||||
) {
|
||||
String groupId = classicGroup.groupId();
|
||||
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
|
||||
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId);
|
||||
consumerGroup.setGroupEpoch(classicGroup.generationId());
|
||||
consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
|
||||
|
||||
|
|
|
@ -84,7 +84,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignment
|
|||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
|
||||
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
|
||||
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
|
||||
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
|
||||
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
|
||||
|
@ -1384,10 +1383,9 @@ public class GroupCoordinatorShardTest {
|
|||
ArgumentCaptor<List<CoordinatorRecord>> recordsCapture = ArgumentCaptor.forClass(List.class);
|
||||
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
|
||||
|
||||
ConsumerGroup group1 = new ConsumerGroup(snapshotRegistry, "group-id", metricsShard);
|
||||
ConsumerGroup group2 = new ConsumerGroup(snapshotRegistry, "other-group-id", metricsShard);
|
||||
ConsumerGroup group1 = new ConsumerGroup(snapshotRegistry, "group-id");
|
||||
ConsumerGroup group2 = new ConsumerGroup(snapshotRegistry, "other-group-id");
|
||||
|
||||
when(groupMetadataManager.groupIds()).thenReturn(Set.of("group-id", "other-group-id"));
|
||||
when(groupMetadataManager.group("group-id")).thenReturn(group1);
|
||||
|
|
|
@ -45,7 +45,6 @@ import org.apache.kafka.coordinator.group.OffsetAndMetadata;
|
|||
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
|
||||
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
|
||||
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
|
||||
import org.apache.kafka.coordinator.group.modern.Assignment;
|
||||
import org.apache.kafka.coordinator.group.modern.MemberState;
|
||||
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
|
||||
|
@ -81,7 +80,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
|||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class ClassicGroupTest {
|
||||
private final String protocolType = "consumer";
|
||||
|
@ -1383,8 +1381,7 @@ public class ClassicGroupTest {
|
|||
|
||||
ConsumerGroup consumerGroup = new ConsumerGroup(
|
||||
new SnapshotRegistry(logContext),
|
||||
groupId,
|
||||
mock(GroupCoordinatorMetricsShard.class)
|
||||
groupId
|
||||
);
|
||||
consumerGroup.setGroupEpoch(10);
|
||||
consumerGroup.setTargetAssignmentEpoch(10);
|
||||
|
@ -1536,8 +1533,7 @@ public class ClassicGroupTest {
|
|||
|
||||
ConsumerGroup consumerGroup = new ConsumerGroup(
|
||||
new SnapshotRegistry(logContext),
|
||||
groupId,
|
||||
mock(GroupCoordinatorMetricsShard.class)
|
||||
groupId
|
||||
);
|
||||
consumerGroup.setGroupEpoch(10);
|
||||
consumerGroup.setTargetAssignmentEpoch(10);
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.kafka.common.errors.IllegalGenerationException;
|
|||
import org.apache.kafka.common.errors.StaleMemberEpochException;
|
||||
import org.apache.kafka.common.errors.UnknownMemberIdException;
|
||||
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||
import org.apache.kafka.common.internals.Topic;
|
||||
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
|
||||
import org.apache.kafka.common.message.JoinGroupRequestData;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
|
@ -45,7 +44,6 @@ import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
|
|||
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
|
||||
import org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
|
||||
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
|
||||
import org.apache.kafka.coordinator.group.modern.Assignment;
|
||||
import org.apache.kafka.coordinator.group.modern.MemberState;
|
||||
import org.apache.kafka.coordinator.group.modern.ModernGroup;
|
||||
|
@ -83,7 +81,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
|||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class ConsumerGroupTest {
|
||||
|
||||
|
@ -91,8 +88,7 @@ public class ConsumerGroupTest {
|
|||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
return new ConsumerGroup(
|
||||
snapshotRegistry,
|
||||
groupId,
|
||||
mock(GroupCoordinatorMetricsShard.class)
|
||||
groupId
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -700,8 +696,7 @@ public class ConsumerGroupTest {
|
|||
@Test
|
||||
public void testUpdateInvertedAssignment() {
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
|
||||
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, "test-group", metricsShard);
|
||||
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, "test-group");
|
||||
Uuid topicId = Uuid.randomUuid();
|
||||
String memberId1 = "member1";
|
||||
String memberId2 = "member2";
|
||||
|
@ -916,12 +911,7 @@ public class ConsumerGroupTest {
|
|||
@Test
|
||||
public void testAsListedGroup() {
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard(
|
||||
snapshotRegistry,
|
||||
Map.of(),
|
||||
new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
|
||||
);
|
||||
ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", metricsShard);
|
||||
ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
|
||||
snapshotRegistry.idempotentCreateSnapshot(0);
|
||||
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0));
|
||||
group.updateMember(new ConsumerGroupMember.Builder("member1")
|
||||
|
@ -937,8 +927,7 @@ public class ConsumerGroupTest {
|
|||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
ConsumerGroup group = new ConsumerGroup(
|
||||
snapshotRegistry,
|
||||
"group-foo",
|
||||
mock(GroupCoordinatorMetricsShard.class)
|
||||
"group-foo"
|
||||
);
|
||||
|
||||
// Simulate a call from the admin client without member id and member epoch.
|
||||
|
@ -997,7 +986,7 @@ public class ConsumerGroupTest {
|
|||
long commitTimestamp = 20000L;
|
||||
long offsetsRetentionMs = 10000L;
|
||||
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID);
|
||||
ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new LogContext()), "group-id", mock(GroupCoordinatorMetricsShard.class));
|
||||
ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new LogContext()), "group-id");
|
||||
|
||||
Optional<OffsetExpirationCondition> offsetExpirationCondition = group.offsetExpirationCondition();
|
||||
assertTrue(offsetExpirationCondition.isPresent());
|
||||
|
@ -1034,7 +1023,7 @@ public class ConsumerGroupTest {
|
|||
@Test
|
||||
public void testAsDescribedGroup() {
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-id-1", mock(GroupCoordinatorMetricsShard.class));
|
||||
ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-id-1");
|
||||
snapshotRegistry.idempotentCreateSnapshot(0);
|
||||
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0));
|
||||
|
||||
|
@ -1071,12 +1060,7 @@ public class ConsumerGroupTest {
|
|||
@Test
|
||||
public void testIsInStatesCaseInsensitive() {
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard(
|
||||
snapshotRegistry,
|
||||
Map.of(),
|
||||
new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
|
||||
);
|
||||
ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", metricsShard);
|
||||
ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
|
||||
snapshotRegistry.idempotentCreateSnapshot(0);
|
||||
assertTrue(group.isInStates(Set.of("empty"), 0));
|
||||
assertFalse(group.isInStates(Set.of("Empty"), 0));
|
||||
|
@ -1307,7 +1291,6 @@ public class ConsumerGroupTest {
|
|||
|
||||
ConsumerGroup consumerGroup = ConsumerGroup.fromClassicGroup(
|
||||
new SnapshotRegistry(logContext),
|
||||
mock(GroupCoordinatorMetricsShard.class),
|
||||
classicGroup,
|
||||
new HashMap<>(),
|
||||
metadataImage
|
||||
|
@ -1315,8 +1298,7 @@ public class ConsumerGroupTest {
|
|||
|
||||
ConsumerGroup expectedConsumerGroup = new ConsumerGroup(
|
||||
new SnapshotRegistry(logContext),
|
||||
groupId,
|
||||
mock(GroupCoordinatorMetricsShard.class)
|
||||
groupId
|
||||
);
|
||||
expectedConsumerGroup.setGroupEpoch(10);
|
||||
expectedConsumerGroup.setTargetAssignmentEpoch(10);
|
||||
|
|
Loading…
Reference in New Issue