diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 9e34d1e4029..90424ba9f0a 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -249,6 +249,7 @@ + diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 56725fa67d0..18d94be7aa8 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -36,7 +36,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.{ClusterResource, KafkaException, TopicPartition, Uuid} import org.apache.kafka.coordinator.group -import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics +import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics} import org.apache.kafka.coordinator.group.util.SystemTimerReaper import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, RecordSerde} import org.apache.kafka.image.publisher.MetadataPublisher @@ -588,6 +588,7 @@ class BrokerServer( .withLoader(loader) .withWriter(writer) .withCoordinatorRuntimeMetrics(new GroupCoordinatorRuntimeMetrics(metrics)) + .withGroupCoordinatorMetrics(new GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics)) .build() } else { GroupCoordinatorAdapter( diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index b184ab9b83b..817f063b6ae 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -64,6 +64,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier; import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor; import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; @@ -105,6 +106,7 @@ public class GroupCoordinatorService implements GroupCoordinator { private Time time; private Timer timer; private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics; + private GroupCoordinatorMetrics groupCoordinatorMetrics; public Builder( int nodeId, @@ -139,6 +141,11 @@ public class GroupCoordinatorService implements GroupCoordinator { return this; } + public Builder withGroupCoordinatorMetrics(GroupCoordinatorMetrics groupCoordinatorMetrics) { + this.groupCoordinatorMetrics = groupCoordinatorMetrics; + return this; + } + public GroupCoordinatorService build() { if (config == null) throw new IllegalArgumentException("Config must be set."); @@ -152,6 +159,8 @@ public class GroupCoordinatorService implements GroupCoordinator { throw new IllegalArgumentException("Timer must be set."); if (coordinatorRuntimeMetrics == null) throw new IllegalArgumentException("CoordinatorRuntimeMetrics must be set."); + if (groupCoordinatorMetrics == null) + throw new IllegalArgumentException("GroupCoordinatorMetrics must be set."); String logPrefix = String.format("GroupCoordinator id=%d", nodeId); LogContext logContext = new LogContext(String.format("[%s] ", logPrefix)); @@ -179,12 +188,14 @@ public class GroupCoordinatorService implements GroupCoordinator { .withCoordinatorShardBuilderSupplier(supplier) .withTime(time) .withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics) + .withCoordinatorMetrics(groupCoordinatorMetrics) .build(); return new GroupCoordinatorService( logContext, config, - runtime + runtime, + groupCoordinatorMetrics ); } } @@ -204,6 +215,11 @@ public class GroupCoordinatorService implements GroupCoordinator { */ private final CoordinatorRuntime runtime; + /** + * The metrics registry. + */ + private final GroupCoordinatorMetrics groupCoordinatorMetrics; + /** * Boolean indicating whether the coordinator is active or not. */ @@ -224,11 +240,13 @@ public class GroupCoordinatorService implements GroupCoordinator { GroupCoordinatorService( LogContext logContext, GroupCoordinatorConfig config, - CoordinatorRuntime runtime + CoordinatorRuntime runtime, + GroupCoordinatorMetrics groupCoordinatorMetrics ) { this.log = logContext.logger(CoordinatorLoader.class); this.config = config; this.runtime = runtime; + this.groupCoordinatorMetrics = groupCoordinatorMetrics; } /** @@ -950,6 +968,7 @@ public class GroupCoordinatorService implements GroupCoordinator { log.info("Shutting down."); isActive.set(false); Utils.closeQuietly(runtime, "coordinator runtime"); + Utils.closeQuietly(groupCoordinatorMetrics, "group coordinator metrics"); log.info("Shutdown complete."); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index d0e67c1e693..65b4a5c8556 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.DescribeGroupsResponseData; @@ -57,6 +58,10 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics; +import org.apache.kafka.coordinator.group.metrics.CoordinatorMetricsShard; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.runtime.CoordinatorShard; import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilder; import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; @@ -90,6 +95,8 @@ public class GroupCoordinatorShard implements CoordinatorShard { private SnapshotRegistry snapshotRegistry; private Time time; private CoordinatorTimer timer; + private CoordinatorMetrics coordinatorMetrics; + private TopicPartition topicPartition; public Builder( GroupCoordinatorConfig config @@ -121,6 +128,20 @@ public class GroupCoordinatorShard implements CoordinatorShard { return this; } + @Override + public CoordinatorShardBuilder withCoordinatorMetrics( + CoordinatorMetrics coordinatorMetrics + ) { + this.coordinatorMetrics = coordinatorMetrics; + return this; + } + + @Override + public CoordinatorShardBuilder withTopicPartition(TopicPartition topicPartition) { + this.topicPartition = topicPartition; + return this; + } + @Override public CoordinatorShardBuilder withSnapshotRegistry( SnapshotRegistry snapshotRegistry @@ -140,6 +161,13 @@ public class GroupCoordinatorShard implements CoordinatorShard { throw new IllegalArgumentException("Time must be set."); if (timer == null) throw new IllegalArgumentException("Timer must be set."); + if (coordinatorMetrics == null || !(coordinatorMetrics instanceof GroupCoordinatorMetrics)) + throw new IllegalArgumentException("CoordinatorMetrics must be set and be of type GroupCoordinatorMetrics."); + if (topicPartition == null) + throw new IllegalArgumentException("TopicPartition must be set."); + + GroupCoordinatorMetricsShard metricsShard = ((GroupCoordinatorMetrics) coordinatorMetrics) + .newMetricsShard(snapshotRegistry, topicPartition); GroupMetadataManager groupMetadataManager = new GroupMetadataManager.Builder() .withLogContext(logContext) @@ -153,6 +181,7 @@ public class GroupCoordinatorShard implements CoordinatorShard { .withGenericGroupNewMemberJoinTimeoutMs(config.genericGroupNewMemberJoinTimeoutMs) .withGenericGroupMinSessionTimeoutMs(config.genericGroupMinSessionTimeoutMs) .withGenericGroupMaxSessionTimeoutMs(config.genericGroupMaxSessionTimeoutMs) + .withGroupCoordinatorMetricsShard(metricsShard) .build(); OffsetMetadataManager offsetMetadataManager = new OffsetMetadataManager.Builder() @@ -161,6 +190,7 @@ public class GroupCoordinatorShard implements CoordinatorShard { .withTime(time) .withGroupMetadataManager(groupMetadataManager) .withGroupCoordinatorConfig(config) + .withGroupCoordinatorMetricsShard(metricsShard) .build(); return new GroupCoordinatorShard( @@ -168,7 +198,9 @@ public class GroupCoordinatorShard implements CoordinatorShard { groupMetadataManager, offsetMetadataManager, timer, - config + config, + coordinatorMetrics, + metricsShard ); } } @@ -205,25 +237,41 @@ public class GroupCoordinatorShard implements CoordinatorShard { */ private final GroupCoordinatorConfig config; + /** + * The coordinator metrics. + */ + private final CoordinatorMetrics coordinatorMetrics; + + /** + * The coordinator metrics shard. + */ + private final CoordinatorMetricsShard metricsShard; + /** * Constructor. * * @param logContext The log context. * @param groupMetadataManager The group metadata manager. * @param offsetMetadataManager The offset metadata manager. + * @param coordinatorMetrics The coordinator metrics. + * @param metricsShard The coordinator metrics shard. */ GroupCoordinatorShard( LogContext logContext, GroupMetadataManager groupMetadataManager, OffsetMetadataManager offsetMetadataManager, CoordinatorTimer timer, - GroupCoordinatorConfig config + GroupCoordinatorConfig config, + CoordinatorMetrics coordinatorMetrics, + CoordinatorMetricsShard metricsShard ) { this.log = logContext.logger(GroupCoordinatorShard.class); this.groupMetadataManager = groupMetadataManager; this.offsetMetadataManager = offsetMetadataManager; this.timer = timer; this.config = config; + this.coordinatorMetrics = coordinatorMetrics; + this.metricsShard = metricsShard; } /** @@ -503,6 +551,7 @@ public class GroupCoordinatorShard implements CoordinatorShard { MetadataDelta emptyDelta = new MetadataDelta(newImage); groupMetadataManager.onNewMetadataImage(newImage, emptyDelta); offsetMetadataManager.onNewMetadataImage(newImage, emptyDelta); + coordinatorMetrics.activateMetricsShard(metricsShard); groupMetadataManager.onLoaded(); scheduleGroupMetadataExpiration(); @@ -511,6 +560,7 @@ public class GroupCoordinatorShard implements CoordinatorShard { @Override public void onUnloaded() { timer.cancel(GROUP_EXPIRATION_KEY); + coordinatorMetrics.deactivateMetricsShard(metricsShard); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 5974aabbe4c..9116d9b6017 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -74,6 +74,7 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generic.GenericGroup; import org.apache.kafka.coordinator.group.generic.GenericGroupMember; import org.apache.kafka.coordinator.group.generic.GenericGroupState; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer; import org.apache.kafka.image.MetadataDelta; @@ -120,6 +121,9 @@ import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE; import static org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.GENERIC_GROUP_REBALANCES_SENSOR_NAME; /** * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds @@ -148,6 +152,7 @@ public class GroupMetadataManager { private int genericGroupNewMemberJoinTimeoutMs = 5 * 60 * 1000; private int genericGroupMinSessionTimeoutMs; private int genericGroupMaxSessionTimeoutMs; + private GroupCoordinatorMetricsShard metrics; Builder withLogContext(LogContext logContext) { this.logContext = logContext; @@ -224,6 +229,11 @@ public class GroupMetadataManager { return this; } + Builder withGroupCoordinatorMetricsShard(GroupCoordinatorMetricsShard metrics) { + this.metrics = metrics; + return this; + } + GroupMetadataManager build() { if (logContext == null) logContext = new LogContext(); if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); @@ -234,12 +244,15 @@ public class GroupMetadataManager { throw new IllegalArgumentException("Timer must be set."); if (consumerGroupAssignors == null || consumerGroupAssignors.isEmpty()) throw new IllegalArgumentException("Assignors must be set before building."); + if (metrics == null) + throw new IllegalArgumentException("GroupCoordinatorMetricsShard must be set."); return new GroupMetadataManager( snapshotRegistry, logContext, time, timer, + metrics, consumerGroupAssignors, metadataImage, consumerGroupMaxSize, @@ -280,6 +293,11 @@ public class GroupMetadataManager { */ private final CoordinatorTimer timer; + /** + * The coordinator metrics. + */ + private final GroupCoordinatorMetricsShard metrics; + /** * The supported partition assignors keyed by their name. */ @@ -364,6 +382,7 @@ public class GroupMetadataManager { LogContext logContext, Time time, CoordinatorTimer timer, + GroupCoordinatorMetricsShard metrics, List assignors, MetadataImage metadataImage, int consumerGroupMaxSize, @@ -381,6 +400,7 @@ public class GroupMetadataManager { this.snapshotRegistry = snapshotRegistry; this.time = time; this.timer = timer; + this.metrics = metrics; this.metadataImage = metadataImage; this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity())); this.defaultAssignor = assignors.get(0); @@ -522,7 +542,7 @@ public class GroupMetadataManager { } if (group == null) { - ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId); + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); groups.put(groupId, consumerGroup); return consumerGroup; } else { @@ -560,7 +580,7 @@ public class GroupMetadataManager { } if (group == null) { - GenericGroup genericGroup = new GenericGroup(logContext, groupId, GenericGroupState.EMPTY, time); + GenericGroup genericGroup = new GenericGroup(logContext, groupId, GenericGroupState.EMPTY, time, metrics); groups.put(groupId, genericGroup); return genericGroup; } else { @@ -893,6 +913,7 @@ public class GroupMetadataManager { groupEpoch += 1; records.add(newGroupEpochRecord(groupId, groupEpoch)); log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); + metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); } group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch); @@ -1332,6 +1353,7 @@ public class GroupMetadataManager { + " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone."); } removeGroup(groupId); + metrics.onConsumerGroupStateTransition(consumerGroup.state(), null); } } @@ -1540,6 +1562,11 @@ public class GroupMetadataManager { if (value == null) { // Tombstone. Group should be removed. + Group group = groups.get(groupId); + if (group != null && group.type() == GENERIC) { + GenericGroup genericGroup = (GenericGroup) group; + metrics.onGenericGroupStateTransition(genericGroup.currentState(), null); + } removeGroup(groupId); } else { List loadedMembers = new ArrayList<>(); @@ -1574,6 +1601,7 @@ public class GroupMetadataManager { groupId, loadedMembers.isEmpty() ? EMPTY : STABLE, time, + metrics, value.generation(), protocolType == null || protocolType.isEmpty() ? Optional.empty() : Optional.of(protocolType), Optional.ofNullable(value.protocol()), @@ -2357,6 +2385,7 @@ public class GroupMetadataManager { } group.transitionTo(PREPARING_REBALANCE); + metrics.record(GENERIC_GROUP_REBALANCES_SENSOR_NAME); log.info("Preparing to rebalance group {} in state {} with old generation {} (reason: {}).", group.groupId(), group.currentState(), group.generationId(), reason); @@ -2823,6 +2852,7 @@ public class GroupMetadataManager { // Update group's assignment and propagate to all members. setAndPropagateAssignment(group, assignment); group.transitionTo(STABLE); + metrics.record(GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME); } } }); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index 076ec24476f..9c18483e1e1 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -37,6 +37,8 @@ import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.coordinator.group.generic.GenericGroup; import org.apache.kafka.coordinator.group.generic.GenericGroupState; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; @@ -55,6 +57,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_OFFSETS; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_DELETIONS_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_EXPIRED_SENSOR_NAME; /** * The OffsetMetadataManager manages the offsets of all the groups. It basically maintains @@ -74,6 +79,7 @@ public class OffsetMetadataManager { private GroupMetadataManager groupMetadataManager = null; private MetadataImage metadataImage = null; private GroupCoordinatorConfig config = null; + private GroupCoordinatorMetricsShard metrics = null; Builder withLogContext(LogContext logContext) { this.logContext = logContext; @@ -105,6 +111,11 @@ public class OffsetMetadataManager { return this; } + Builder withGroupCoordinatorMetricsShard(GroupCoordinatorMetricsShard metrics) { + this.metrics = metrics; + return this; + } + public OffsetMetadataManager build() { if (logContext == null) logContext = new LogContext(); if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); @@ -115,13 +126,18 @@ public class OffsetMetadataManager { throw new IllegalArgumentException("GroupMetadataManager cannot be null"); } + if (metrics == null) { + throw new IllegalArgumentException("GroupCoordinatorMetricsShard cannot be null"); + } + return new OffsetMetadataManager( snapshotRegistry, logContext, time, metadataImage, groupMetadataManager, - config + config, + metrics ); } } @@ -151,6 +167,11 @@ public class OffsetMetadataManager { */ private final GroupMetadataManager groupMetadataManager; + /** + * The coordinator metrics. + */ + private final GroupCoordinatorMetricsShard metrics; + /** * The group coordinator config. */ @@ -167,7 +188,8 @@ public class OffsetMetadataManager { Time time, MetadataImage metadataImage, GroupMetadataManager groupMetadataManager, - GroupCoordinatorConfig config + GroupCoordinatorConfig config, + GroupCoordinatorMetricsShard metrics ) { this.snapshotRegistry = snapshotRegistry; this.log = logContext.logger(OffsetMetadataManager.class); @@ -175,6 +197,7 @@ public class OffsetMetadataManager { this.metadataImage = metadataImage; this.groupMetadataManager = groupMetadataManager; this.config = config; + this.metrics = metrics; this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); } @@ -352,6 +375,10 @@ public class OffsetMetadataManager { }); }); + if (!records.isEmpty()) { + metrics.record(GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME, records.size()); + } + return new CoordinatorResult<>(records, response); } @@ -408,6 +435,7 @@ public class OffsetMetadataManager { .setPartitions(responsePartitionCollection) ); }); + metrics.record(OFFSET_DELETIONS_SENSOR_NAME, records.size()); return new CoordinatorResult<>( records, @@ -595,6 +623,7 @@ public class OffsetMetadataManager { log.info("[GroupId {}] Expiring offsets of partitions (allOffsetsExpired={}): {}", groupId, allOffsetsExpired, String.join(", ", expiredPartitions)); } + metrics.record(OFFSET_EXPIRED_SENSOR_NAME, expiredPartitions.size()); return allOffsetsExpired.get(); } @@ -710,7 +739,9 @@ public class OffsetMetadataManager { .computeIfAbsent(groupId, __ -> new TimelineHashMap<>(snapshotRegistry, 0)); TimelineHashMap partitionOffsets = topicOffsets .computeIfAbsent(topic, __ -> new TimelineHashMap<>(snapshotRegistry, 0)); - partitionOffsets.put(partition, offsetAndMetadata); + if (partitionOffsets.put(partition, offsetAndMetadata) == null) { + metrics.incrementLocalGauge(NUM_OFFSETS); + } } /** @@ -734,6 +765,7 @@ public class OffsetMetadataManager { return; partitionOffsets.remove(partition); + metrics.decrementLocalGauge(NUM_OFFSETS); if (partitionOffsets.isEmpty()) topicOffsets.remove(topic); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index e5144953a97..f3c22837472 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -28,6 +28,7 @@ import org.apache.kafka.coordinator.group.OffsetExpirationCondition; import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; import org.apache.kafka.coordinator.group.Record; import org.apache.kafka.coordinator.group.RecordHelpers; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.image.ClusterImage; import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicsImage; @@ -45,6 +46,11 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.ASSIGNING; +import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.RECONCILING; +import static org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState.STABLE; + /** * A Consumer Group. All the metadata in this class are backed by * records in the __consumer_offsets partitions. @@ -143,6 +149,11 @@ public class ConsumerGroup implements Group { */ private final TimelineHashMap> currentPartitionEpoch; + /** + * The coordinator metrics. + */ + private final GroupCoordinatorMetricsShard metrics; + /** * The metadata refresh deadline. It consists of a timestamp in milliseconds together with * the group epoch at the time of setting it. The metadata refresh time is considered as a @@ -157,11 +168,12 @@ public class ConsumerGroup implements Group { public ConsumerGroup( SnapshotRegistry snapshotRegistry, - String groupId + String groupId, + GroupCoordinatorMetricsShard metrics ) { this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry); this.groupId = Objects.requireNonNull(groupId); - this.state = new TimelineObject<>(snapshotRegistry, ConsumerGroupState.EMPTY); + this.state = new TimelineObject<>(snapshotRegistry, EMPTY); this.groupEpoch = new TimelineInteger(snapshotRegistry); this.members = new TimelineHashMap<>(snapshotRegistry, 0); this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0); @@ -170,6 +182,9 @@ public class ConsumerGroup implements Group { this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry); this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0); this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0); + this.metrics = Objects.requireNonNull(metrics); + + metrics.onConsumerGroupStateTransition(null, this.state.get()); } /** @@ -672,20 +687,23 @@ public class ConsumerGroup implements Group { * Updates the current state of the group. */ private void maybeUpdateGroupState() { + ConsumerGroupState previousState = state.get(); + ConsumerGroupState newState = STABLE; if (members.isEmpty()) { - state.set(ConsumerGroupState.EMPTY); + newState = EMPTY; } else if (groupEpoch.get() > targetAssignmentEpoch.get()) { - state.set(ConsumerGroupState.ASSIGNING); + newState = ASSIGNING; } else { for (ConsumerGroupMember member : members.values()) { if (member.targetMemberEpoch() != targetAssignmentEpoch.get() || member.state() != ConsumerGroupMember.MemberState.STABLE) { - state.set(ConsumerGroupState.RECONCILING); - return; + newState = RECONCILING; + break; } } - - state.set(ConsumerGroupState.STABLE); } + + state.set(newState); + metrics.onConsumerGroupStateTransition(previousState, newState); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java index 4b408e0484a..72de3e8fd52 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java @@ -37,6 +37,7 @@ import org.apache.kafka.coordinator.group.OffsetExpirationCondition; import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; import org.apache.kafka.coordinator.group.Record; import org.apache.kafka.coordinator.group.RecordHelpers; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.slf4j.Logger; import java.nio.ByteBuffer; @@ -185,17 +186,24 @@ public class GenericGroup implements Group { */ private boolean newMemberAdded = false; + /** + * Coordinator metrics. + */ + private final GroupCoordinatorMetricsShard metrics; + public GenericGroup( LogContext logContext, String groupId, GenericGroupState initialState, - Time time + Time time, + GroupCoordinatorMetricsShard metrics ) { this( logContext, groupId, initialState, time, + metrics, 0, Optional.empty(), Optional.empty(), @@ -209,6 +217,7 @@ public class GenericGroup implements Group { String groupId, GenericGroupState initialState, Time time, + GroupCoordinatorMetricsShard metrics, int generationId, Optional protocolType, Optional protocolName, @@ -221,11 +230,13 @@ public class GenericGroup implements Group { this.state = Objects.requireNonNull(initialState); this.previousState = DEAD; this.time = Objects.requireNonNull(time); + this.metrics = Objects.requireNonNull(metrics); this.generationId = generationId; this.protocolType = protocolType; this.protocolName = protocolName; this.leaderId = leaderId; this.currentStateTimestamp = currentStateTimestamp; + metrics.onGenericGroupStateTransition(null, initialState); } /** @@ -973,6 +984,7 @@ public class GenericGroup implements Group { previousState = state; state = groupState; currentStateTimestamp = Optional.of(time.milliseconds()); + metrics.onGenericGroupStateTransition(previousState, state); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorMetrics.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorMetrics.java new file mode 100644 index 00000000000..5df97505988 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorMetrics.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.MetricsRegistry; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; +import org.apache.kafka.timeline.SnapshotRegistry; + +public abstract class CoordinatorMetrics { + + public abstract CoordinatorMetricsShard newMetricsShard(SnapshotRegistry snapshotRegistry, TopicPartition tp); + + public abstract void activateMetricsShard(CoordinatorMetricsShard shard); + + public abstract void deactivateMetricsShard(CoordinatorMetricsShard shard); + + public abstract MetricsRegistry registry(); + + public static MetricName getMetricName(String group, String type, String name) { + return KafkaYammerMetrics.getMetricName(group, type, name); + } + + public abstract void onUpdateLastCommittedOffset(TopicPartition tp, long offset); +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorMetricsShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorMetricsShard.java new file mode 100644 index 00000000000..e1abd914137 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorMetricsShard.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import com.yammer.metrics.core.MetricName; +import org.apache.kafka.common.TopicPartition; + +/** + * A CoordinatorMetricsShard is mapped to a single CoordinatorShard. For gauges, each metrics shard increments/decrements + * based on the operations performed. Then, {@link CoordinatorMetrics} will perform aggregations across all shards. + * + * For sensors, each shard individually records the observed values. + */ +public interface CoordinatorMetricsShard { + /** + * Increment a global gauge. + * + * @param metricName the metric name. + */ + void incrementGlobalGauge(MetricName metricName); + + /** + * Increment a local gauge. + * + * @param metricName the metric name. + */ + void incrementLocalGauge(MetricName metricName); + + /** + * Decrement a global gauge. + * + * @param metricName the metric name. + */ + void decrementGlobalGauge(MetricName metricName); + + /** + * Decrement a local gauge. + * + * @param metricName the metric name. + */ + void decrementLocalGauge(MetricName metricName); + + /** + * Obtain the current value of a global gauge. + * + * @param metricName the metric name. + */ + long globalGaugeValue(MetricName metricName); + + /** + * Obtain the current value of a local gauge. + * + * @param metricName the metric name. + */ + long localGaugeValue(MetricName metricName); + + /** + * Increment the value of a sensor. + * + * @param sensorName the sensor name. + */ + void record(String sensorName); + + /** + * Record a sensor with a value. + * + * @param sensorName the sensor name. + * @param val the value to record. + */ + void record(String sensorName, double val); + + /** + * @return The topic partition. + */ + TopicPartition topicPartition(); + + /** + * Commits all gauges backed by the snapshot registry. + * + * @param offset The last committed offset. + */ + void commitUpTo(long offset); +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java new file mode 100644 index 00000000000..bef36089edb --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.MetricsRegistry; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; +import org.apache.kafka.timeline.SnapshotRegistry; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; + +/** + * These are the metrics which are managed by the {@link org.apache.kafka.coordinator.group.GroupMetadataManager} class. + * They generally pertain to aspects of group management, such as the number of groups in different states. + */ +public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoCloseable { + public static final String METRICS_GROUP = "group-coordinator-metrics"; + + public final static MetricName NUM_OFFSETS = getMetricName( + "GroupMetadataManager", "NumOffsets"); + public final static MetricName NUM_GENERIC_GROUPS = getMetricName( + "GroupMetadataManager", "NumGroups"); + public final static MetricName NUM_GENERIC_GROUPS_PREPARING_REBALANCE = getMetricName( + "GroupMetadataManager", "NumGroupsPreparingRebalance"); + public final static MetricName NUM_GENERIC_GROUPS_COMPLETING_REBALANCE = getMetricName( + "GroupMetadataManager", "NumGroupsCompletingRebalance"); + public final static MetricName NUM_GENERIC_GROUPS_STABLE = getMetricName( + "GroupMetadataManager", "NumGroupsStable"); + public final static MetricName NUM_GENERIC_GROUPS_DEAD = getMetricName( + "GroupMetadataManager", "NumGroupsDead"); + public final static MetricName NUM_GENERIC_GROUPS_EMPTY = getMetricName( + "GroupMetadataManager", "NumGroupsEmpty"); + public final static MetricName NUM_CONSUMER_GROUPS = getMetricName( + "GroupMetadataManager", "NumConsumerGroups"); + public final static MetricName NUM_CONSUMER_GROUPS_EMPTY = getMetricName( + "GroupMetadataManager", "NumConsumerGroupsEmpty"); + public final static MetricName NUM_CONSUMER_GROUPS_ASSIGNING = getMetricName( + "GroupMetadataManager", "NumConsumerGroupsAssigning"); + public final static MetricName NUM_CONSUMER_GROUPS_RECONCILING = getMetricName( + "GroupMetadataManager", "NumConsumerGroupsReconciling"); + public final static MetricName NUM_CONSUMER_GROUPS_STABLE = getMetricName( + "GroupMetadataManager", "NumConsumerGroupsStable"); + public final static MetricName NUM_CONSUMER_GROUPS_DEAD = getMetricName( + "GroupMetadataManager", "NumConsumerGroupsDead"); + + public static final String OFFSET_COMMITS_SENSOR_NAME = "OffsetCommits"; + public static final String OFFSET_EXPIRED_SENSOR_NAME = "OffsetExpired"; + public static final String OFFSET_DELETIONS_SENSOR_NAME = "OffsetDeletions"; + public static final String GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME = "CompletedRebalances"; + public static final String GENERIC_GROUP_REBALANCES_SENSOR_NAME = "GenericGroupRebalances"; + public static final String CONSUMER_GROUP_REBALANCES_SENSOR_NAME = "ConsumerGroupRebalances"; + + private final MetricsRegistry registry; + private final Metrics metrics; + private final Map shards = new HashMap<>(); + private static final AtomicLong NUM_GENERIC_GROUPS_PREPARING_REBALANCE_COUNTER = new AtomicLong(0); + private static final AtomicLong NUM_GENERIC_GROUPS_COMPLETING_REBALANCE_COUNTER = new AtomicLong(0); + private static final AtomicLong NUM_GENERIC_GROUPS_STABLE_COUNTER = new AtomicLong(0); + private static final AtomicLong NUM_GENERIC_GROUPS_DEAD_COUNTER = new AtomicLong(0); + private static final AtomicLong NUM_GENERIC_GROUPS_EMPTY_COUNTER = new AtomicLong(0); + + /** + * Global sensors. These are shared across all metrics shards. + */ + public final Map globalSensors; + + /** + * Global gauge counters. These are shared across all metrics shards. + */ + public static final Map GLOBAL_GAUGES = Collections.unmodifiableMap(Utils.mkMap( + Utils.mkEntry(NUM_GENERIC_GROUPS_PREPARING_REBALANCE.getName(), NUM_GENERIC_GROUPS_PREPARING_REBALANCE_COUNTER), + Utils.mkEntry(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE.getName(), NUM_GENERIC_GROUPS_COMPLETING_REBALANCE_COUNTER), + Utils.mkEntry(NUM_GENERIC_GROUPS_STABLE.getName(), NUM_GENERIC_GROUPS_STABLE_COUNTER), + Utils.mkEntry(NUM_GENERIC_GROUPS_DEAD.getName(), NUM_GENERIC_GROUPS_DEAD_COUNTER), + Utils.mkEntry(NUM_GENERIC_GROUPS_EMPTY.getName(), NUM_GENERIC_GROUPS_EMPTY_COUNTER) + )); + + public GroupCoordinatorMetrics() { + this(KafkaYammerMetrics.defaultRegistry(), new Metrics()); + } + + public GroupCoordinatorMetrics(MetricsRegistry registry, Metrics metrics) { + this.registry = Objects.requireNonNull(registry); + this.metrics = Objects.requireNonNull(metrics); + + registerGauges(); + + Sensor offsetCommitsSensor = metrics.sensor(OFFSET_COMMITS_SENSOR_NAME); + offsetCommitsSensor.add(new Meter( + metrics.metricName("offset-commit-rate", + METRICS_GROUP, + "The rate of committed offsets"), + metrics.metricName("offset-commit-count", + METRICS_GROUP, + "The total number of committed offsets"))); + + Sensor offsetExpiredSensor = metrics.sensor(OFFSET_EXPIRED_SENSOR_NAME); + offsetExpiredSensor.add(new Meter( + metrics.metricName("offset-expiration-rate", + METRICS_GROUP, + "The rate of expired offsets"), + metrics.metricName("offset-expiration-count", + METRICS_GROUP, + "The total number of expired offsets"))); + + Sensor offsetDeletionsSensor = metrics.sensor(OFFSET_DELETIONS_SENSOR_NAME); + offsetDeletionsSensor.add(new Meter( + metrics.metricName("offset-deletion-rate", + METRICS_GROUP, + "The rate of administrative deleted offsets"), + metrics.metricName("offset-deletion-count", + METRICS_GROUP, + "The total number of administrative deleted offsets"))); + + Sensor genericGroupCompletedRebalancesSensor = metrics.sensor(GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME); + genericGroupCompletedRebalancesSensor.add(new Meter( + metrics.metricName("group-completed-rebalance-rate", + METRICS_GROUP, + "The rate of generic group completed rebalances"), + metrics.metricName("group-completed-rebalance-count", + METRICS_GROUP, + "The total number of generic group completed rebalances"))); + + Sensor genericGroupPreparingRebalancesSensor = metrics.sensor(GENERIC_GROUP_REBALANCES_SENSOR_NAME); + genericGroupPreparingRebalancesSensor.add(new Meter( + metrics.metricName("group-rebalance-rate", + METRICS_GROUP, + "The rate of generic group preparing rebalances"), + metrics.metricName("group-rebalance-count", + METRICS_GROUP, + "The total number of generic group preparing rebalances"))); + + Sensor consumerGroupRebalanceSensor = metrics.sensor(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); + consumerGroupRebalanceSensor.add(new Meter( + metrics.metricName("consumer-group-rebalance-rate", + METRICS_GROUP, + "The rate of consumer group rebalances"), + metrics.metricName("consumer-group-rebalance-count", + METRICS_GROUP, + "The total number of consumer group rebalances"))); + + globalSensors = Collections.unmodifiableMap(Utils.mkMap( + Utils.mkEntry(OFFSET_COMMITS_SENSOR_NAME, offsetCommitsSensor), + Utils.mkEntry(OFFSET_EXPIRED_SENSOR_NAME, offsetExpiredSensor), + Utils.mkEntry(OFFSET_DELETIONS_SENSOR_NAME, offsetDeletionsSensor), + Utils.mkEntry(GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME, genericGroupCompletedRebalancesSensor), + Utils.mkEntry(GENERIC_GROUP_REBALANCES_SENSOR_NAME, genericGroupPreparingRebalancesSensor), + Utils.mkEntry(CONSUMER_GROUP_REBALANCES_SENSOR_NAME, consumerGroupRebalanceSensor) + )); + } + + public Long numOffsets() { + return shards.values().stream().mapToLong(shard -> shard.localGaugeValue(NUM_OFFSETS)).sum(); + } + + public Long numGenericGroups() { + return shards.values().stream().mapToLong(shard -> shard.localGaugeValue(NUM_GENERIC_GROUPS)).sum(); + } + + public Long numGenericGroupsPreparingRebalanceCount() { + return NUM_GENERIC_GROUPS_PREPARING_REBALANCE_COUNTER.get(); + } + + public Long numGenericGroupsCompletingRebalanceCount() { + return NUM_GENERIC_GROUPS_COMPLETING_REBALANCE_COUNTER.get(); + } + public Long numGenericGroupsStableCount() { + return NUM_GENERIC_GROUPS_STABLE_COUNTER.get(); + } + + public Long numGenericGroupsDeadCount() { + return NUM_GENERIC_GROUPS_DEAD_COUNTER.get(); + } + + public Long numGenericGroupsEmptyCount() { + return NUM_GENERIC_GROUPS_EMPTY_COUNTER.get(); + } + + public long numConsumerGroups() { + return shards.values().stream().mapToLong(shard -> shard.localGaugeValue(NUM_CONSUMER_GROUPS)).sum(); + } + + public long numConsumerGroupsEmpty() { + return shards.values().stream().mapToLong(shard -> shard.localGaugeValue(NUM_CONSUMER_GROUPS_EMPTY)).sum(); + } + + public long numConsumerGroupsAssigning() { + return shards.values().stream().mapToLong(shard -> shard.localGaugeValue(NUM_CONSUMER_GROUPS_ASSIGNING)).sum(); + } + + public long numConsumerGroupsReconciling() { + return shards.values().stream().mapToLong(shard -> shard.localGaugeValue(NUM_CONSUMER_GROUPS_RECONCILING)).sum(); + } + + public long numConsumerGroupsStable() { + return shards.values().stream().mapToLong(shard -> shard.localGaugeValue(NUM_CONSUMER_GROUPS_STABLE)).sum(); + } + + public long numConsumerGroupsDead() { + return shards.values().stream().mapToLong(shard -> shard.localGaugeValue(NUM_CONSUMER_GROUPS_DEAD)).sum(); + } + + @Override + public void close() { + Arrays.asList( + NUM_OFFSETS, + NUM_GENERIC_GROUPS, + NUM_GENERIC_GROUPS_PREPARING_REBALANCE, + NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, + NUM_GENERIC_GROUPS_STABLE, + NUM_GENERIC_GROUPS_DEAD, + NUM_GENERIC_GROUPS_EMPTY, + NUM_CONSUMER_GROUPS, + NUM_CONSUMER_GROUPS_EMPTY, + NUM_CONSUMER_GROUPS_ASSIGNING, + NUM_CONSUMER_GROUPS_RECONCILING, + NUM_CONSUMER_GROUPS_STABLE, + NUM_CONSUMER_GROUPS_DEAD + ).forEach(registry::removeMetric); + + Arrays.asList( + OFFSET_COMMITS_SENSOR_NAME, + OFFSET_EXPIRED_SENSOR_NAME, + OFFSET_DELETIONS_SENSOR_NAME, + GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME, + GENERIC_GROUP_REBALANCES_SENSOR_NAME, + CONSUMER_GROUP_REBALANCES_SENSOR_NAME + ).forEach(metrics::removeSensor); + } + + @Override + public GroupCoordinatorMetricsShard newMetricsShard(SnapshotRegistry snapshotRegistry, TopicPartition tp) { + return new GroupCoordinatorMetricsShard(snapshotRegistry, globalSensors, GLOBAL_GAUGES, tp); + } + + @Override + public void activateMetricsShard(CoordinatorMetricsShard shard) { + shards.put(shard.topicPartition(), shard); + } + + @Override + public void deactivateMetricsShard(CoordinatorMetricsShard shard) { + shards.remove(shard.topicPartition()); + } + + @Override + public MetricsRegistry registry() { + return this.registry; + } + + @Override + public void onUpdateLastCommittedOffset(TopicPartition tp, long offset) { + CoordinatorMetricsShard shard = shards.get(tp); + if (shard != null) { + shard.commitUpTo(offset); + } + } + + public static MetricName getMetricName(String type, String name) { + return getMetricName("kafka.coordinator.group", type, name); + } + + private void registerGauges() { + registry.newGauge(NUM_OFFSETS, new Gauge() { + @Override + public Long value() { + return numOffsets(); + } + }); + + registry.newGauge(NUM_GENERIC_GROUPS, new Gauge() { + @Override + public Long value() { + return numGenericGroups(); + } + }); + + registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new Gauge() { + @Override + public Long value() { + return numGenericGroupsPreparingRebalanceCount(); + } + }); + + registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new Gauge() { + @Override + public Long value() { + return numGenericGroupsCompletingRebalanceCount(); + } + }); + + registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new Gauge() { + @Override + public Long value() { + return numGenericGroupsStableCount(); + } + }); + + registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new Gauge() { + @Override + public Long value() { + return numGenericGroupsDeadCount(); + } + }); + + registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new Gauge() { + @Override + public Long value() { + return numGenericGroupsEmptyCount(); + } + }); + + registry.newGauge(NUM_CONSUMER_GROUPS, new Gauge() { + @Override + public Long value() { + return numConsumerGroups(); + } + }); + + registry.newGauge(NUM_CONSUMER_GROUPS_EMPTY, new Gauge() { + @Override + public Long value() { + return numConsumerGroupsEmpty(); + } + }); + + registry.newGauge(NUM_CONSUMER_GROUPS_ASSIGNING, new Gauge() { + @Override + public Long value() { + return numConsumerGroupsAssigning(); + } + }); + + registry.newGauge(NUM_CONSUMER_GROUPS_RECONCILING, new Gauge() { + @Override + public Long value() { + return numConsumerGroupsReconciling(); + } + }); + + registry.newGauge(NUM_CONSUMER_GROUPS_STABLE, new Gauge() { + @Override + public Long value() { + return numConsumerGroupsStable(); + } + }); + + registry.newGauge(NUM_CONSUMER_GROUPS_DEAD, new Gauge() { + @Override + public Long value() { + return numConsumerGroupsDead(); + } + }); + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java new file mode 100644 index 00000000000..1741963acf3 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import com.yammer.metrics.core.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.generic.GenericGroupState; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineLong; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_ASSIGNING; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_DEAD; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_EMPTY; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_RECONCILING; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_STABLE; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_DEAD; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_EMPTY; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_PREPARING_REBALANCE; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_STABLE; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_OFFSETS; + +/** + * This class is mapped to a single {@link org.apache.kafka.coordinator.group.GroupCoordinatorShard}. It will + * record all metrics that the shard handles with respect to {@link org.apache.kafka.coordinator.group.OffsetMetadataManager} + * and {@link org.apache.kafka.coordinator.group.GroupMetadataManager} operations. + * + * Local gauges will be recorded in this class which will be gathered by {@link GroupCoordinatorMetrics} to + * report. + */ +public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard { + + /** + * This class represents a gauge counter for this shard. The TimelineLong object represents a gauge backed by + * the snapshot registry. Once we commit to a certain offset in the snapshot registry, we write the given + * TimelineLong's value to the AtomicLong. This AtomicLong represents the actual gauge counter that is queried + * when reporting the value to {@link GroupCoordinatorMetrics}. + */ + private static class TimelineGaugeCounter { + + final TimelineLong timelineLong; + + final AtomicLong atomicLong; + + public TimelineGaugeCounter(TimelineLong timelineLong, AtomicLong atomicLong) { + this.timelineLong = timelineLong; + this.atomicLong = atomicLong; + } + } + + /** + * Local timeline gauge counters keyed by the metric name. + */ + private final Map localGauges; + + /** + * All sensors keyed by the sensor name. A Sensor object is shared across all metrics shards. + */ + private final Map globalSensors; + + /** + * Global gauge counters keyed by the metric name. The same counter is shared across all metrics shards. + */ + private final Map globalGauges; + + /** + * The topic partition. + */ + private final TopicPartition topicPartition; + + public GroupCoordinatorMetricsShard( + SnapshotRegistry snapshotRegistry, + Map globalSensors, + Map globalGauges, + TopicPartition topicPartition + ) { + Objects.requireNonNull(snapshotRegistry); + TimelineLong numOffsetsTimeline = new TimelineLong(snapshotRegistry); + TimelineLong numGenericGroupsTimeline = new TimelineLong(snapshotRegistry); + TimelineLong numConsumerGroupsTimeline = new TimelineLong(snapshotRegistry); + TimelineLong numConsumerGroupsEmptyTimeline = new TimelineLong(snapshotRegistry); + TimelineLong numConsumerGroupsAssigningTimeline = new TimelineLong(snapshotRegistry); + TimelineLong numConsumerGroupsReconcilingTimeline = new TimelineLong(snapshotRegistry); + TimelineLong numConsumerGroupsStableTimeline = new TimelineLong(snapshotRegistry); + TimelineLong numConsumerGroupsDeadTimeline = new TimelineLong(snapshotRegistry); + + this.localGauges = Collections.unmodifiableMap(Utils.mkMap( + Utils.mkEntry(NUM_OFFSETS.getName(), + new TimelineGaugeCounter(numOffsetsTimeline, new AtomicLong(0))), + Utils.mkEntry(NUM_GENERIC_GROUPS.getName(), + new TimelineGaugeCounter(numGenericGroupsTimeline, new AtomicLong(0))), + Utils.mkEntry(NUM_CONSUMER_GROUPS.getName(), + new TimelineGaugeCounter(numConsumerGroupsTimeline, new AtomicLong(0))), + Utils.mkEntry(NUM_CONSUMER_GROUPS_EMPTY.getName(), + new TimelineGaugeCounter(numConsumerGroupsEmptyTimeline, new AtomicLong(0))), + Utils.mkEntry(NUM_CONSUMER_GROUPS_ASSIGNING.getName(), + new TimelineGaugeCounter(numConsumerGroupsAssigningTimeline, new AtomicLong(0))), + Utils.mkEntry(NUM_CONSUMER_GROUPS_RECONCILING.getName(), + new TimelineGaugeCounter(numConsumerGroupsReconcilingTimeline, new AtomicLong(0))), + Utils.mkEntry(NUM_CONSUMER_GROUPS_STABLE.getName(), + new TimelineGaugeCounter(numConsumerGroupsStableTimeline, new AtomicLong(0))), + Utils.mkEntry(NUM_CONSUMER_GROUPS_DEAD.getName(), + new TimelineGaugeCounter(numConsumerGroupsDeadTimeline, new AtomicLong(0))) + )); + + this.globalSensors = Objects.requireNonNull(globalSensors); + this.globalGauges = Objects.requireNonNull(globalGauges); + this.topicPartition = Objects.requireNonNull(topicPartition); + } + + @Override + public void incrementGlobalGauge(MetricName metricName) { + AtomicLong gaugeCounter = globalGauges.get(metricName.getName()); + if (gaugeCounter != null) { + gaugeCounter.incrementAndGet(); + } + } + + @Override + public void incrementLocalGauge(MetricName metricName) { + TimelineGaugeCounter gaugeCounter = localGauges.get(metricName.getName()); + if (gaugeCounter != null) { + synchronized (gaugeCounter.timelineLong) { + gaugeCounter.timelineLong.increment(); + } + } + } + + @Override + public void decrementGlobalGauge(MetricName metricName) { + AtomicLong gaugeCounter = globalGauges.get(metricName.getName()); + if (gaugeCounter != null) { + gaugeCounter.decrementAndGet(); + } + } + + @Override + public void decrementLocalGauge(MetricName metricName) { + TimelineGaugeCounter gaugeCounter = localGauges.get(metricName.getName()); + if (gaugeCounter != null) { + synchronized (gaugeCounter.timelineLong) { + gaugeCounter.timelineLong.decrement(); + } + } + } + + @Override + public long globalGaugeValue(MetricName metricName) { + AtomicLong gaugeCounter = globalGauges.get(metricName.getName()); + if (gaugeCounter != null) { + return gaugeCounter.get(); + } + return 0; + } + + @Override + public long localGaugeValue(MetricName metricName) { + TimelineGaugeCounter gaugeCounter = localGauges.get(metricName.getName()); + if (gaugeCounter != null) { + return gaugeCounter.atomicLong.get(); + } + return 0; + } + + @Override + public void record(String sensorName) { + Sensor sensor = globalSensors.get(sensorName); + if (sensor != null) { + sensor.record(); + } + } + + @Override + public void record(String sensorName, double val) { + Sensor sensor = globalSensors.get(sensorName); + if (sensor != null) { + sensor.record(val); + } + } + + @Override + public TopicPartition topicPartition() { + return this.topicPartition; + } + + @Override + public void commitUpTo(long offset) { + this.localGauges.forEach((__, gaugeCounter) -> { + long value; + synchronized (gaugeCounter.timelineLong) { + value = gaugeCounter.timelineLong.get(offset); + } + gaugeCounter.atomicLong.set(value); + }); + } + + /** + * Called when a generic group's state has changed. Increment/decrement + * the counter accordingly. + * + * @param oldState The previous state. null value means that it's a new group. + * @param newState The next state. null value means that the group has been removed. + */ + public void onGenericGroupStateTransition(GenericGroupState oldState, GenericGroupState newState) { + if (newState != null) { + switch (newState) { + case PREPARING_REBALANCE: + incrementGlobalGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE); + break; + case COMPLETING_REBALANCE: + incrementGlobalGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE); + break; + case STABLE: + incrementGlobalGauge(NUM_GENERIC_GROUPS_STABLE); + break; + case DEAD: + incrementGlobalGauge(NUM_GENERIC_GROUPS_DEAD); + break; + case EMPTY: + incrementGlobalGauge(NUM_GENERIC_GROUPS_EMPTY); + } + } else { + decrementLocalGauge(NUM_GENERIC_GROUPS); + } + + if (oldState != null) { + switch (oldState) { + case PREPARING_REBALANCE: + decrementGlobalGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE); + break; + case COMPLETING_REBALANCE: + decrementGlobalGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE); + break; + case STABLE: + decrementGlobalGauge(NUM_GENERIC_GROUPS_STABLE); + break; + case DEAD: + decrementGlobalGauge(NUM_GENERIC_GROUPS_DEAD); + break; + case EMPTY: + decrementGlobalGauge(NUM_GENERIC_GROUPS_EMPTY); + } + } else { + incrementLocalGauge(NUM_GENERIC_GROUPS); + } + } + + /** + * Called when a consumer group's state has changed. Increment/decrement + * the counter accordingly. + * + * @param oldState The previous state. null value means that it's a new group. + * @param newState The next state. null value means that the group has been removed. + */ + public void onConsumerGroupStateTransition( + ConsumerGroup.ConsumerGroupState oldState, + ConsumerGroup.ConsumerGroupState newState + ) { + if (newState != null) { + switch (newState) { + case EMPTY: + incrementLocalGauge(NUM_CONSUMER_GROUPS_EMPTY); + break; + case ASSIGNING: + incrementLocalGauge(NUM_CONSUMER_GROUPS_ASSIGNING); + break; + case RECONCILING: + incrementLocalGauge(NUM_CONSUMER_GROUPS_RECONCILING); + break; + case STABLE: + incrementLocalGauge(NUM_CONSUMER_GROUPS_STABLE); + break; + case DEAD: + incrementLocalGauge(NUM_CONSUMER_GROUPS_DEAD); + } + } else { + decrementLocalGauge(NUM_CONSUMER_GROUPS); + } + + if (oldState != null) { + switch (oldState) { + case EMPTY: + decrementLocalGauge(NUM_CONSUMER_GROUPS_EMPTY); + break; + case ASSIGNING: + decrementLocalGauge(NUM_CONSUMER_GROUPS_ASSIGNING); + break; + case RECONCILING: + decrementLocalGauge(NUM_CONSUMER_GROUPS_RECONCILING); + break; + case STABLE: + decrementLocalGauge(NUM_CONSUMER_GROUPS_STABLE); + break; + case DEAD: + decrementLocalGauge(NUM_CONSUMER_GROUPS_DEAD); + } + } else { + incrementLocalGauge(NUM_CONSUMER_GROUPS); + } + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index 2f75fa59b55..2812a0e3ec5 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics; +import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics; import org.apache.kafka.deferred.DeferredEvent; import org.apache.kafka.deferred.DeferredEventQueue; import org.apache.kafka.image.MetadataDelta; @@ -91,6 +92,7 @@ public class CoordinatorRuntime, U> implements Aut private Time time = Time.SYSTEM; private Timer timer; private CoordinatorRuntimeMetrics runtimeMetrics; + private CoordinatorMetrics coordinatorMetrics; public Builder withLogPrefix(String logPrefix) { this.logPrefix = logPrefix; @@ -137,6 +139,11 @@ public class CoordinatorRuntime, U> implements Aut return this; } + public Builder withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) { + this.coordinatorMetrics = coordinatorMetrics; + return this; + } + public CoordinatorRuntime build() { if (logPrefix == null) logPrefix = ""; @@ -156,6 +163,8 @@ public class CoordinatorRuntime, U> implements Aut throw new IllegalArgumentException("Timer must be set."); if (runtimeMetrics == null) throw new IllegalArgumentException("CoordinatorRuntimeMetrics must be set."); + if (coordinatorMetrics == null) + throw new IllegalArgumentException("CoordinatorMetrics must be set."); return new CoordinatorRuntime<>( logPrefix, @@ -166,7 +175,8 @@ public class CoordinatorRuntime, U> implements Aut coordinatorShardBuilderSupplier, time, timer, - runtimeMetrics + runtimeMetrics, + coordinatorMetrics ); } } @@ -497,6 +507,7 @@ public class CoordinatorRuntime, U> implements Aut lastCommittedOffset = offset; deferredEventQueue.completeUpTo(offset); snapshotRegistry.deleteSnapshotsUpTo(offset); + coordinatorMetrics.onUpdateLastCommittedOffset(tp, offset); } /** @@ -510,8 +521,8 @@ public class CoordinatorRuntime, U> implements Aut if (!newState.canTransitionFrom(state)) { throw new IllegalStateException("Cannot transition from " + state + " to " + newState); } - CoordinatorState oldState = state; + log.debug("Transition from {} to {}.", state, newState); switch (newState) { case LOADING: @@ -525,6 +536,8 @@ public class CoordinatorRuntime, U> implements Aut .withSnapshotRegistry(snapshotRegistry) .withTime(time) .withTimer(timer) + .withCoordinatorMetrics(coordinatorMetrics) + .withTopicPartition(tp) .build(); break; @@ -1046,6 +1059,11 @@ public class CoordinatorRuntime, U> implements Aut */ private final CoordinatorRuntimeMetrics runtimeMetrics; + /** + * The coordinator metrics. + */ + private final CoordinatorMetrics coordinatorMetrics; + /** * Atomic boolean indicating whether the runtime is running. */ @@ -1077,7 +1095,8 @@ public class CoordinatorRuntime, U> implements Aut CoordinatorShardBuilderSupplier coordinatorShardBuilderSupplier, Time time, Timer timer, - CoordinatorRuntimeMetrics runtimeMetrics + CoordinatorRuntimeMetrics runtimeMetrics, + CoordinatorMetrics coordinatorMetrics ) { this.logPrefix = logPrefix; this.logContext = logContext; @@ -1091,6 +1110,7 @@ public class CoordinatorRuntime, U> implements Aut this.loader = loader; this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier; this.runtimeMetrics = runtimeMetrics; + this.coordinatorMetrics = coordinatorMetrics; } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShardBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShardBuilder.java index df2b514b63c..699b0b01924 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShardBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShardBuilder.java @@ -16,8 +16,10 @@ */ package org.apache.kafka.coordinator.group.runtime; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics; import org.apache.kafka.timeline.SnapshotRegistry; @@ -74,6 +76,28 @@ public interface CoordinatorShardBuilder, U> { CoordinatorTimer timer ); + /** + * Sets the coordinator metrics. + * + * @param coordinatorMetrics The coordinator metrics. + * + * @return The builder. + */ + CoordinatorShardBuilder withCoordinatorMetrics( + CoordinatorMetrics coordinatorMetrics + ); + + /** + * Sets the topic partition. + * + * @param topicPartition The topic partition. + * + * @return The builder. + */ + CoordinatorShardBuilder withTopicPartition( + TopicPartition topicPartition + ); + /** * @return The built coordinator. */ diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 98174b07508..752cca39f3e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -62,6 +62,7 @@ import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime; import org.apache.kafka.server.record.BrokerCompressionType; import org.apache.kafka.server.util.FutureUtils; @@ -128,7 +129,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); service.startup(() -> 1); @@ -143,7 +145,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); ConsumerGroupHeartbeatRequestData request = new ConsumerGroupHeartbeatRequestData() @@ -167,7 +170,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); ConsumerGroupHeartbeatRequestData request = new ConsumerGroupHeartbeatRequestData() @@ -215,7 +219,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); ConsumerGroupHeartbeatRequestData request = new ConsumerGroupHeartbeatRequestData() @@ -248,7 +253,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); assertThrows(CoordinatorNotAvailableException.class, @@ -265,7 +271,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); Properties expectedProperties = new Properties(); @@ -282,7 +289,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); assertThrows(CoordinatorNotAvailableException.class, @@ -303,7 +311,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); assertThrows(CoordinatorNotAvailableException.class, @@ -324,7 +333,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); JoinGroupRequestData request = new JoinGroupRequestData() @@ -355,7 +365,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); JoinGroupRequestData request = new JoinGroupRequestData() @@ -388,7 +399,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); service.startup(() -> 1); @@ -433,7 +445,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); JoinGroupRequestData request = new JoinGroupRequestData() @@ -458,7 +471,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); SyncGroupRequestData request = new SyncGroupRequestData() @@ -489,7 +503,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); SyncGroupRequestData request = new SyncGroupRequestData() @@ -523,7 +538,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); service.startup(() -> 1); @@ -551,7 +567,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); SyncGroupRequestData request = new SyncGroupRequestData() @@ -576,7 +593,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); HeartbeatRequestData request = new HeartbeatRequestData() @@ -607,7 +625,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); HeartbeatRequestData request = new HeartbeatRequestData() @@ -638,7 +657,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); HeartbeatRequestData request = new HeartbeatRequestData() @@ -672,7 +692,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); HeartbeatRequestData request = new HeartbeatRequestData() @@ -696,7 +717,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); int partitionCount = 3; service.startup(() -> partitionCount); @@ -746,7 +768,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); int partitionCount = 3; service.startup(() -> partitionCount); @@ -797,7 +820,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); int partitionCount = 3; service.startup(() -> partitionCount); @@ -838,7 +862,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); int partitionCount = 0; service.startup(() -> partitionCount); @@ -862,7 +887,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); ListGroupsRequestData request = new ListGroupsRequestData(); @@ -885,7 +911,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + mock(GroupCoordinatorMetrics.class) ); int partitionCount = 2; service.startup(() -> partitionCount); @@ -926,7 +953,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + mock(GroupCoordinatorMetrics.class) ); int partitionCount = 1; service.startup(() -> partitionCount); @@ -958,7 +986,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + mock(GroupCoordinatorMetrics.class) ); int partitionCount = 1; service.startup(() -> partitionCount); @@ -989,7 +1018,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); CompletableFuture> future = service.describeGroups( @@ -1015,7 +1045,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); service.startup(() -> 1); @@ -1068,7 +1099,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); OffsetFetchRequestData.OffsetFetchRequestGroup request = @@ -1101,7 +1133,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); service.startup(() -> 1); @@ -1151,7 +1184,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); OffsetFetchRequestData.OffsetFetchRequestGroup request = @@ -1178,7 +1212,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); LeaveGroupRequestData request = new LeaveGroupRequestData() @@ -1209,7 +1244,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); LeaveGroupRequestData request = new LeaveGroupRequestData() @@ -1261,7 +1297,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); LeaveGroupRequestData request = new LeaveGroupRequestData() @@ -1285,7 +1322,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + mock(GroupCoordinatorMetrics.class) ); service.startup(() -> 1); @@ -1334,7 +1372,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + mock(GroupCoordinatorMetrics.class) ); service.startup(() -> 1); @@ -1378,7 +1417,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + mock(GroupCoordinatorMetrics.class) ); service.startup(() -> 1); @@ -1419,7 +1459,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + new GroupCoordinatorMetrics() ); OffsetDeleteRequestData request = new OffsetDeleteRequestData() @@ -1444,7 +1485,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + mock(GroupCoordinatorMetrics.class) ); service.startup(() -> 3); @@ -1519,7 +1561,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + mock(GroupCoordinatorMetrics.class) ); service.startup(() -> 1); @@ -1552,7 +1595,8 @@ public class GroupCoordinatorServiceTest { GroupCoordinatorService service = new GroupCoordinatorService( new LogContext(), createConfig(), - runtime + runtime, + mock(GroupCoordinatorMetrics.class) ); CompletableFuture future = service.deleteGroups( diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index ee8003b29bb..b9e45bdeb2c 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -43,6 +43,8 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics; +import org.apache.kafka.coordinator.group.metrics.CoordinatorMetricsShard; import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -80,12 +82,16 @@ public class GroupCoordinatorShardTest { public void testConsumerGroupHeartbeat() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); RequestContext context = requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT); @@ -107,12 +113,16 @@ public class GroupCoordinatorShardTest { public void testCommitOffset() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); RequestContext context = requestContext(ApiKeys.OFFSET_COMMIT); @@ -139,7 +149,9 @@ public class GroupCoordinatorShardTest { groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + mock(CoordinatorMetrics.class), + mock(CoordinatorMetricsShard.class) ); RequestContext context = requestContext(ApiKeys.DELETE_GROUPS); @@ -193,7 +205,9 @@ public class GroupCoordinatorShardTest { groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + mock(CoordinatorMetrics.class), + mock(CoordinatorMetricsShard.class) ); RequestContext context = requestContext(ApiKeys.DELETE_GROUPS); @@ -253,12 +267,16 @@ public class GroupCoordinatorShardTest { public void testReplayOffsetCommit() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); OffsetCommitKey key = new OffsetCommitKey(); @@ -281,12 +299,16 @@ public class GroupCoordinatorShardTest { public void testReplayOffsetCommitWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); OffsetCommitKey key = new OffsetCommitKey(); @@ -308,12 +330,16 @@ public class GroupCoordinatorShardTest { public void testReplayConsumerGroupMetadata() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey(); @@ -331,12 +357,16 @@ public class GroupCoordinatorShardTest { public void testReplayConsumerGroupMetadataWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey(); @@ -353,12 +383,16 @@ public class GroupCoordinatorShardTest { public void testReplayConsumerGroupPartitionMetadata() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); ConsumerGroupPartitionMetadataKey key = new ConsumerGroupPartitionMetadataKey(); @@ -376,12 +410,16 @@ public class GroupCoordinatorShardTest { public void testReplayConsumerGroupPartitionMetadataWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); ConsumerGroupPartitionMetadataKey key = new ConsumerGroupPartitionMetadataKey(); @@ -398,12 +436,16 @@ public class GroupCoordinatorShardTest { public void testReplayConsumerGroupMemberMetadata() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); ConsumerGroupMemberMetadataKey key = new ConsumerGroupMemberMetadataKey(); @@ -421,12 +463,16 @@ public class GroupCoordinatorShardTest { public void testReplayConsumerGroupMemberMetadataWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); ConsumerGroupMemberMetadataKey key = new ConsumerGroupMemberMetadataKey(); @@ -443,12 +489,16 @@ public class GroupCoordinatorShardTest { public void testReplayConsumerGroupTargetAssignmentMetadata() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); ConsumerGroupTargetAssignmentMetadataKey key = new ConsumerGroupTargetAssignmentMetadataKey(); @@ -466,12 +516,16 @@ public class GroupCoordinatorShardTest { public void testReplayConsumerGroupTargetAssignmentMetadataWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); ConsumerGroupTargetAssignmentMetadataKey key = new ConsumerGroupTargetAssignmentMetadataKey(); @@ -488,12 +542,16 @@ public class GroupCoordinatorShardTest { public void testReplayConsumerGroupTargetAssignmentMember() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); ConsumerGroupTargetAssignmentMemberKey key = new ConsumerGroupTargetAssignmentMemberKey(); @@ -511,12 +569,16 @@ public class GroupCoordinatorShardTest { public void testReplayConsumerGroupTargetAssignmentMemberKeyWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); ConsumerGroupTargetAssignmentMemberKey key = new ConsumerGroupTargetAssignmentMemberKey(); @@ -533,12 +595,16 @@ public class GroupCoordinatorShardTest { public void testReplayConsumerGroupCurrentMemberAssignment() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey(); @@ -556,12 +622,16 @@ public class GroupCoordinatorShardTest { public void testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey(); @@ -578,12 +648,16 @@ public class GroupCoordinatorShardTest { public void testReplayKeyCannotBeNull() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); assertThrows(NullPointerException.class, () -> coordinator.replay(new Record(null, null))); @@ -593,12 +667,16 @@ public class GroupCoordinatorShardTest { public void testReplayWithUnsupportedVersion() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey(); @@ -615,12 +693,16 @@ public class GroupCoordinatorShardTest { MetadataImage image = MetadataImage.EMPTY; GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); coordinator.onLoaded(image); @@ -637,12 +719,16 @@ public class GroupCoordinatorShardTest { public void testReplayGroupMetadata() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); GroupMetadataKey key = new GroupMetadataKey(); @@ -660,12 +746,16 @@ public class GroupCoordinatorShardTest { public void testReplayGroupMetadataWithNullValue() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); GroupCoordinatorShard coordinator = new GroupCoordinatorShard( new LogContext(), groupMetadataManager, offsetMetadataManager, new MockCoordinatorTimer<>(new MockTime()), - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard ); GroupMetadataKey key = new GroupMetadataKey(); @@ -689,7 +779,9 @@ public class GroupCoordinatorShardTest { groupMetadataManager, offsetMetadataManager, timer, - GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 1000L, 24 * 60) + GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 1000L, 24 * 60), + mock(CoordinatorMetrics.class), + mock(CoordinatorMetricsShard.class) ); MetadataImage image = MetadataImage.EMPTY; @@ -717,7 +809,9 @@ public class GroupCoordinatorShardTest { groupMetadataManager, offsetMetadataManager, timer, - mock(GroupCoordinatorConfig.class) + mock(GroupCoordinatorConfig.class), + mock(CoordinatorMetrics.class), + mock(CoordinatorMetricsShard.class) ); Record offsetCommitTombstone = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "topic", 0); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index ee44304072e..2dca0c2e352 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -90,6 +90,7 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generic.GenericGroup; import org.apache.kafka.coordinator.group.generic.GenericGroupMember; import org.apache.kafka.coordinator.group.generic.GenericGroupState; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; @@ -141,6 +142,9 @@ import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE; import static org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.GENERIC_GROUP_REBALANCES_SENSOR_NAME; import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -152,6 +156,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class GroupMetadataManagerTest { @@ -317,6 +322,7 @@ public class GroupMetadataManagerTest { final private int genericGroupNewMemberJoinTimeoutMs = 5 * 60 * 1000; private int genericGroupMinSessionTimeoutMs = 10; private int genericGroupMaxSessionTimeoutMs = 10 * 60 * 1000; + private GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class); public Builder withMetadataImage(MetadataImage metadataImage) { this.metadataImage = metadataImage; @@ -371,6 +377,7 @@ public class GroupMetadataManagerTest { time, timer, snapshotRegistry, + metrics, new GroupMetadataManager.Builder() .withSnapshotRegistry(snapshotRegistry) .withLogContext(logContext) @@ -387,6 +394,7 @@ public class GroupMetadataManagerTest { .withGenericGroupMaxSessionTimeoutMs(genericGroupMaxSessionTimeoutMs) .withGenericGroupInitialRebalanceDelayMs(genericGroupInitialRebalanceDelayMs) .withGenericGroupNewMemberJoinTimeoutMs(genericGroupNewMemberJoinTimeoutMs) + .withGroupCoordinatorMetricsShard(metrics) .build(), genericGroupInitialRebalanceDelayMs, genericGroupNewMemberJoinTimeoutMs @@ -405,6 +413,7 @@ public class GroupMetadataManagerTest { final MockTime time; final MockCoordinatorTimer timer; final SnapshotRegistry snapshotRegistry; + final GroupCoordinatorMetricsShard metrics; final GroupMetadataManager groupMetadataManager; final int genericGroupInitialRebalanceDelayMs; final int genericGroupNewMemberJoinTimeoutMs; @@ -416,6 +425,7 @@ public class GroupMetadataManagerTest { MockTime time, MockCoordinatorTimer timer, SnapshotRegistry snapshotRegistry, + GroupCoordinatorMetricsShard metrics, GroupMetadataManager groupMetadataManager, int genericGroupInitialRebalanceDelayMs, int genericGroupNewMemberJoinTimeoutMs @@ -423,6 +433,7 @@ public class GroupMetadataManagerTest { this.time = time; this.timer = timer; this.snapshotRegistry = snapshotRegistry; + this.metrics = metrics; this.groupMetadataManager = groupMetadataManager; this.genericGroupInitialRebalanceDelayMs = genericGroupInitialRebalanceDelayMs; this.genericGroupNewMemberJoinTimeoutMs = genericGroupNewMemberJoinTimeoutMs; @@ -3829,6 +3840,12 @@ public class GroupMetadataManagerTest { "group-id", STABLE, context.time, + new GroupCoordinatorMetricsShard( + context.snapshotRegistry, + Collections.emptyMap(), + Collections.emptyMap(), + new TopicPartition("__consumer_offsets", 0) + ), 1, Optional.of("consumer"), Optional.of("range"), @@ -9557,6 +9574,71 @@ public class GroupMetadataManagerTest { assertEquals(Collections.emptyList(), records); } + @Test + public void testGenericGroupCompletedRebalanceSensor() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.joinGenericGroupAsDynamicMemberAndCompleteRebalance("group-id"); + verify(context.metrics).record(GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME); + } + + @Test + public void testGenericGroupRebalanceSensor() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.createGenericGroup("group-id"); + + context.joinGenericGroupAsDynamicMemberAndCompleteJoin( + new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build() + ); + verify(context.metrics).record(GENERIC_GROUP_REBALANCES_SENSOR_NAME); + } + + @Test + public void testConsumerGroupRebalanceSensor() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), + mkTopicAssignment(barTopicId, 0, 1, 2) + ))) + )); + + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + verify(context.metrics).record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); + } + private static void assertNoOrEmptyResult(List> timeouts) { assertTrue(timeouts.size() <= 1); timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT, timeout.result)); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index 82fca8b0e25..792d1f62507 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -51,6 +51,7 @@ import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.coordinator.group.generic.GenericGroup; import org.apache.kafka.coordinator.group.generic.GenericGroupMember; import org.apache.kafka.coordinator.group.generic.GenericGroupState; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -70,6 +71,9 @@ import java.util.OptionalInt; import java.util.OptionalLong; import static org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_DELETIONS_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_EXPIRED_SENSOR_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -77,6 +81,8 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class OffsetMetadataManagerTest { @@ -89,6 +95,7 @@ public class OffsetMetadataManagerTest { private GroupMetadataManager groupMetadataManager = null; private MetadataImage metadataImage = null; private GroupCoordinatorConfig config = null; + private GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class); Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) { config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize, 60000L, 24 * 60 * 1000); @@ -119,6 +126,7 @@ public class OffsetMetadataManagerTest { .withLogContext(logContext) .withMetadataImage(metadataImage) .withConsumerGroupAssignors(Collections.singletonList(new RangeAssignor())) + .withGroupCoordinatorMetricsShard(metrics) .build(); } @@ -129,12 +137,14 @@ public class OffsetMetadataManagerTest { .withMetadataImage(metadataImage) .withGroupMetadataManager(groupMetadataManager) .withGroupCoordinatorConfig(config) + .withGroupCoordinatorMetricsShard(metrics) .build(); return new OffsetMetadataManagerTestContext( time, timer, snapshotRegistry, + metrics, groupMetadataManager, offsetMetadataManager ); @@ -144,6 +154,7 @@ public class OffsetMetadataManagerTest { final MockTime time; final MockCoordinatorTimer timer; final SnapshotRegistry snapshotRegistry; + final GroupCoordinatorMetricsShard metrics; final GroupMetadataManager groupMetadataManager; final OffsetMetadataManager offsetMetadataManager; @@ -154,12 +165,14 @@ public class OffsetMetadataManagerTest { MockTime time, MockCoordinatorTimer timer, SnapshotRegistry snapshotRegistry, + GroupCoordinatorMetricsShard metrics, GroupMetadataManager groupMetadataManager, OffsetMetadataManager offsetMetadataManager ) { this.time = time; this.timer = timer; this.snapshotRegistry = snapshotRegistry; + this.metrics = metrics; this.groupMetadataManager = groupMetadataManager; this.offsetMetadataManager = offsetMetadataManager; } @@ -1986,6 +1999,128 @@ public class OffsetMetadataManagerTest { assertNull(context.offsetMetadataManager.offset("foo", "bar", 0)); } + @Test + public void testOffsetCommitsSensor() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + true + ); + + // Add member. + group.add(mkGenericMember("member", Optional.of("new-instance-id"))); + + // Transition to next generation. + group.transitionTo(GenericGroupState.PREPARING_REBALANCE); + group.initNextGeneration(); + assertEquals(1, group.generationId()); + group.transitionTo(GenericGroupState.STABLE); + + CoordinatorResult result = context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(1) + .setRetentionTimeMs(1234L) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Arrays.asList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L), + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(150L) + )) + )) + ); + + verify(context.metrics).record(OFFSET_COMMITS_SENSOR_NAME, 2); + } + + @Test + public void testOffsetsExpiredSensor() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + Group group = mock(Group.class); + + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() + .withGroupMetadataManager(groupMetadataManager) + .withOffsetsRetentionMs(1000) + .build(); + + long commitTimestamp = context.time.milliseconds(); + + context.commitOffset("group-id", "firstTopic", 0, 100L, 0, commitTimestamp); + context.commitOffset("group-id", "secondTopic", 0, 100L, 0, commitTimestamp); + context.commitOffset("group-id", "secondTopic", 1, 100L, 0, commitTimestamp + 500); + + context.time.sleep(1000); + + // firstTopic-0: group is still subscribed to firstTopic. Do not expire. + // secondTopic-0: should expire as offset retention has passed. + // secondTopic-1: has not passed offset retention. Do not expire. + List expectedRecords = Collections.singletonList( + RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "secondTopic", 0) + ); + + when(groupMetadataManager.group("group-id")).thenReturn(group); + when(group.offsetExpirationCondition()).thenReturn(Optional.of( + new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs))); + when(group.isSubscribedToTopic("firstTopic")).thenReturn(true); + when(group.isSubscribedToTopic("secondTopic")).thenReturn(false); + + List records = new ArrayList<>(); + assertFalse(context.cleanupExpiredOffsets("group-id", records)); + assertEquals(expectedRecords, records); + + // Expire secondTopic-1. + context.time.sleep(500); + + records = new ArrayList<>(); + assertFalse(context.cleanupExpiredOffsets("group-id", records)); + verify(context.metrics, times(2)).record(OFFSET_EXPIRED_SENSOR_NAME, 1); + + // Add 2 more commits, then expire all. + when(group.isSubscribedToTopic("firstTopic")).thenReturn(false); + context.commitOffset("group-id", "firstTopic", 1, 100L, 0, commitTimestamp + 500); + context.commitOffset("group-id", "secondTopic", 0, 101L, 0, commitTimestamp + 500); + + records = new ArrayList<>(); + assertTrue(context.cleanupExpiredOffsets("group-id", records)); + verify(context.metrics).record(OFFSET_EXPIRED_SENSOR_NAME, 3); + } + + @Test + public void testOffsetDeletionsSensor() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup("foo", true); + + context.commitOffset("foo", "bar", 0, 100L, 0); + context.commitOffset("foo", "bar", 1, 150L, 0); + group.setSubscribedTopics(Optional.of(Collections.emptySet())); + + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("bar") + .setPartitions(Arrays.asList( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0), + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(1) + )) + ).iterator()); + + context.deleteOffsets( + new OffsetDeleteRequestData() + .setGroupId("foo") + .setTopics(requestTopicCollection) + ); + + verify(context.metrics).record(OFFSET_DELETIONS_SENSOR_NAME, 2); + } + private void verifyReplay( OffsetMetadataManagerTestContext context, String groupId, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java index 03b11cbed6f..4c8690fb2ac 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java @@ -45,6 +45,7 @@ import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.coordinator.group.generic.GenericGroup; import org.apache.kafka.coordinator.group.generic.GenericGroupMember; import org.apache.kafka.coordinator.group.generic.GenericGroupState; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.MetadataVersion; import org.junit.jupiter.api.Test; @@ -91,6 +92,7 @@ import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.mock; public class RecordHelpersTest { @@ -527,7 +529,8 @@ public class RecordHelpersTest { new LogContext(), "group-id", GenericGroupState.PREPARING_REBALANCE, - time + time, + mock(GroupCoordinatorMetricsShard.class) ); Map assignment = new HashMap<>(); @@ -597,7 +600,8 @@ public class RecordHelpersTest { new LogContext(), "group-id", GenericGroupState.PREPARING_REBALANCE, - time + time, + mock(GroupCoordinatorMetricsShard.class) ); expectedMembers.forEach(member -> { @@ -648,7 +652,8 @@ public class RecordHelpersTest { new LogContext(), "group-id", GenericGroupState.PREPARING_REBALANCE, - time + time, + mock(GroupCoordinatorMetricsShard.class) ); expectedMembers.forEach(member -> { @@ -707,7 +712,8 @@ public class RecordHelpersTest { new LogContext(), "group-id", GenericGroupState.PREPARING_REBALANCE, - time + time, + mock(GroupCoordinatorMetricsShard.class) ); group.initNextGeneration(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java index df643a5d2f0..c9379fc2e7b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.coordinator.group.consumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.StaleMemberEpochException; @@ -26,6 +27,7 @@ import org.apache.kafka.coordinator.group.GroupMetadataManagerTest; import org.apache.kafka.coordinator.group.OffsetAndMetadata; import org.apache.kafka.coordinator.group.OffsetExpirationCondition; import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.Test; @@ -46,12 +48,17 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; public class ConsumerGroupTest { private ConsumerGroup createConsumerGroup(String groupId) { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - return new ConsumerGroup(snapshotRegistry, groupId); + return new ConsumerGroup( + snapshotRegistry, + groupId, + mock(GroupCoordinatorMetricsShard.class) + ); } @Test @@ -642,7 +649,13 @@ public class ConsumerGroupTest { @Test public void testAsListedGroup() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo"); + GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard( + snapshotRegistry, + Collections.emptyMap(), + Collections.emptyMap(), + new TopicPartition("__consumer_offsets", 0) + ); + ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", metricsShard); snapshotRegistry.getOrCreateSnapshot(0); assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0)); group.updateMember(new ConsumerGroupMember.Builder("member1") @@ -656,7 +669,11 @@ public class ConsumerGroupTest { @Test public void testValidateOffsetFetch() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo"); + ConsumerGroup group = new ConsumerGroup( + snapshotRegistry, + "group-foo", + mock(GroupCoordinatorMetricsShard.class) + ); // Simulate a call from the admin client without member id and member epoch. group.validateOffsetFetch(null, -1, Long.MAX_VALUE); @@ -715,7 +732,7 @@ public class ConsumerGroupTest { long commitTimestamp = 20000L; long offsetsRetentionMs = 10000L; OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty()); - ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new LogContext()), "group-id"); + ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new LogContext()), "group-id", mock(GroupCoordinatorMetricsShard.class)); Optional offsetExpirationCondition = group.offsetExpirationCondition(); assertTrue(offsetExpirationCondition.isPresent()); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java index 156fdf1b507..e5e71aaf2ab 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.group.generic; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.GroupIdNotFoundException; @@ -36,6 +37,8 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.coordinator.group.OffsetAndMetadata; import org.apache.kafka.coordinator.group.OffsetExpirationCondition; import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; +import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -59,6 +62,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; public class GenericGroupTest { private final String protocolType = "consumer"; @@ -68,12 +72,19 @@ public class GenericGroupTest { private final String clientHost = "clientHost"; private final int rebalanceTimeoutMs = 60000; private final int sessionTimeoutMs = 10000; + private final LogContext logContext = new LogContext(); + private final GroupCoordinatorMetricsShard metrics = new GroupCoordinatorMetricsShard( + new SnapshotRegistry(logContext), + Collections.emptyMap(), + Collections.emptyMap(), + new TopicPartition("__consumer_offsets", 0) + ); private GenericGroup group = null; @BeforeEach public void initialize() { - group = new GenericGroup(new LogContext(), "groupId", EMPTY, Time.SYSTEM); + group = new GenericGroup(logContext, "groupId", EMPTY, Time.SYSTEM, metrics); } @Test @@ -1099,7 +1110,7 @@ public class GenericGroupTest { OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty()); MockTime time = new MockTime(); long currentStateTimestamp = time.milliseconds(); - GenericGroup group = new GenericGroup(new LogContext(), "groupId", EMPTY, time); + GenericGroup group = new GenericGroup(new LogContext(), "groupId", EMPTY, time, mock(GroupCoordinatorMetricsShard.class)); // 1. Test no protocol type. Simple consumer case, Base timestamp based off of last commit timestamp. Optional offsetExpirationCondition = group.offsetExpirationCondition(); @@ -1174,7 +1185,7 @@ public class GenericGroupTest { @Test public void testIsSubscribedToTopic() { - GenericGroup group = new GenericGroup(new LogContext(), "groupId", EMPTY, Time.SYSTEM); + GenericGroup group = new GenericGroup(new LogContext(), "groupId", EMPTY, Time.SYSTEM, mock(GroupCoordinatorMetricsShard.class)); // 1. group has no protocol type => not subscribed assertFalse(group.isSubscribedToTopic("topic")); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java new file mode 100644 index 00000000000..b9d9df6e6b3 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import com.yammer.metrics.core.MetricsRegistry; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.generic.GenericGroup; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_ASSIGNING; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_DEAD; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_EMPTY; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_RECONCILING; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS_STABLE; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_DEAD; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_EMPTY; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_PREPARING_REBALANCE; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_GENERIC_GROUPS_STABLE; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_OFFSETS; +import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertGaugeValue; +import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.metricName; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class GroupCoordinatorMetricsShardTest { + + @Test + public void testLocalGauges() { + MetricsRegistry registry = new MetricsRegistry(); + Metrics metrics = new Metrics(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + TopicPartition tp = new TopicPartition("__consumer_offsets", 0); + GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics); + GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard(snapshotRegistry, tp); + + shard.incrementLocalGauge(NUM_OFFSETS); + shard.incrementLocalGauge(NUM_GENERIC_GROUPS); + shard.incrementLocalGauge(NUM_CONSUMER_GROUPS); + shard.incrementLocalGauge(NUM_CONSUMER_GROUPS_EMPTY); + shard.incrementLocalGauge(NUM_CONSUMER_GROUPS_ASSIGNING); + shard.incrementLocalGauge(NUM_CONSUMER_GROUPS_RECONCILING); + shard.incrementLocalGauge(NUM_CONSUMER_GROUPS_STABLE); + shard.incrementLocalGauge(NUM_CONSUMER_GROUPS_DEAD); + + snapshotRegistry.getOrCreateSnapshot(1000); + // The value should not be updated until the offset has been committed. + assertEquals(0, shard.localGaugeValue(NUM_OFFSETS)); + assertEquals(0, shard.localGaugeValue(NUM_GENERIC_GROUPS)); + assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS)); + assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_EMPTY)); + assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_ASSIGNING)); + assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_RECONCILING)); + assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_STABLE)); + assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_DEAD)); + + shard.commitUpTo(1000); + assertEquals(1, shard.localGaugeValue(NUM_OFFSETS)); + assertEquals(1, shard.localGaugeValue(NUM_GENERIC_GROUPS)); + assertEquals(1, shard.localGaugeValue(NUM_CONSUMER_GROUPS)); + assertEquals(1, shard.localGaugeValue(NUM_CONSUMER_GROUPS_EMPTY)); + assertEquals(1, shard.localGaugeValue(NUM_CONSUMER_GROUPS_ASSIGNING)); + assertEquals(1, shard.localGaugeValue(NUM_CONSUMER_GROUPS_RECONCILING)); + assertEquals(1, shard.localGaugeValue(NUM_CONSUMER_GROUPS_STABLE)); + assertEquals(1, shard.localGaugeValue(NUM_CONSUMER_GROUPS_DEAD)); + + shard.decrementLocalGauge(NUM_OFFSETS); + shard.decrementLocalGauge(NUM_GENERIC_GROUPS); + shard.decrementLocalGauge(NUM_CONSUMER_GROUPS); + shard.decrementLocalGauge(NUM_CONSUMER_GROUPS_EMPTY); + shard.decrementLocalGauge(NUM_CONSUMER_GROUPS_ASSIGNING); + shard.decrementLocalGauge(NUM_CONSUMER_GROUPS_RECONCILING); + shard.decrementLocalGauge(NUM_CONSUMER_GROUPS_STABLE); + shard.decrementLocalGauge(NUM_CONSUMER_GROUPS_DEAD); + + snapshotRegistry.getOrCreateSnapshot(2000); + shard.commitUpTo(2000); + assertEquals(0, shard.localGaugeValue(NUM_OFFSETS)); + assertEquals(0, shard.localGaugeValue(NUM_GENERIC_GROUPS)); + assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS)); + assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_EMPTY)); + assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_ASSIGNING)); + assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_RECONCILING)); + assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_STABLE)); + assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_DEAD)); + } + + @Test + public void testGenericGroupStateTransitionMetrics() { + MetricsRegistry registry = new MetricsRegistry(); + Metrics metrics = new Metrics(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + TopicPartition tp = new TopicPartition("__consumer_offsets", 0); + GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics); + GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard(snapshotRegistry, tp); + coordinatorMetrics.activateMetricsShard(shard); + + LogContext logContext = new LogContext(); + GenericGroup group0 = new GenericGroup(logContext, "groupId0", EMPTY, Time.SYSTEM, shard); + GenericGroup group1 = new GenericGroup(logContext, "groupId1", EMPTY, Time.SYSTEM, shard); + GenericGroup group2 = new GenericGroup(logContext, "groupId2", EMPTY, Time.SYSTEM, shard); + GenericGroup group3 = new GenericGroup(logContext, "groupId3", EMPTY, Time.SYSTEM, shard); + + snapshotRegistry.getOrCreateSnapshot(1000); + shard.commitUpTo(1000); + assertEquals(4, shard.localGaugeValue(NUM_GENERIC_GROUPS)); + + group0.transitionTo(PREPARING_REBALANCE); + group0.transitionTo(COMPLETING_REBALANCE); + group1.transitionTo(PREPARING_REBALANCE); + group2.transitionTo(DEAD); + + snapshotRegistry.getOrCreateSnapshot(2000); + shard.commitUpTo(2000); + assertEquals(1, shard.globalGaugeValue(NUM_GENERIC_GROUPS_EMPTY)); + assertEquals(1, shard.globalGaugeValue(NUM_GENERIC_GROUPS_PREPARING_REBALANCE)); + assertEquals(1, shard.globalGaugeValue(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE)); + assertEquals(1, shard.globalGaugeValue(NUM_GENERIC_GROUPS_DEAD)); + assertEquals(0, shard.globalGaugeValue(NUM_GENERIC_GROUPS_STABLE)); + + group0.transitionTo(STABLE); + group1.transitionTo(COMPLETING_REBALANCE); + group3.transitionTo(DEAD); + + snapshotRegistry.getOrCreateSnapshot(3000); + shard.commitUpTo(3000); + assertEquals(0, shard.globalGaugeValue(NUM_GENERIC_GROUPS_EMPTY)); + assertEquals(0, shard.globalGaugeValue(NUM_GENERIC_GROUPS_PREPARING_REBALANCE)); + assertEquals(1, shard.globalGaugeValue(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE)); + assertEquals(2, shard.globalGaugeValue(NUM_GENERIC_GROUPS_DEAD)); + assertEquals(1, shard.globalGaugeValue(NUM_GENERIC_GROUPS_STABLE)); + + assertGaugeValue(registry, metricName("GroupMetadataManager", "NumGroups"), 4); + assertGaugeValue(registry, metricName("GroupMetadataManager", "NumGroupsEmpty"), 0); + assertGaugeValue(registry, metricName("GroupMetadataManager", "NumGroupsPreparingRebalance"), 0); + assertGaugeValue(registry, metricName("GroupMetadataManager", "NumGroupsCompletingRebalance"), 1); + assertGaugeValue(registry, metricName("GroupMetadataManager", "NumGroupsDead"), 2); + assertGaugeValue(registry, metricName("GroupMetadataManager", "NumGroupsStable"), 1); + } + + @Test + public void testConsumerGroupStateTransitionMetrics() { + MetricsRegistry registry = new MetricsRegistry(); + Metrics metrics = new Metrics(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + TopicPartition tp = new TopicPartition("__consumer_offsets", 0); + GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics); + GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard(snapshotRegistry, tp); + coordinatorMetrics.activateMetricsShard(shard); + + ConsumerGroup group0 = new ConsumerGroup( + snapshotRegistry, + "group-0", + shard + ); + ConsumerGroup group1 = new ConsumerGroup( + snapshotRegistry, + "group-1", + shard + ); + ConsumerGroup group2 = new ConsumerGroup( + snapshotRegistry, + "group-2", + shard + ); + ConsumerGroup group3 = new ConsumerGroup( + snapshotRegistry, + "group-3", + shard + ); + + snapshotRegistry.getOrCreateSnapshot(1000); + shard.commitUpTo(1000); + assertEquals(4, shard.localGaugeValue(NUM_CONSUMER_GROUPS)); + assertEquals(4, shard.localGaugeValue(NUM_CONSUMER_GROUPS_EMPTY)); + + ConsumerGroupMember member0 = group0.getOrMaybeCreateMember("member-id", true); + ConsumerGroupMember member1 = group1.getOrMaybeCreateMember("member-id", true); + ConsumerGroupMember member2 = group2.getOrMaybeCreateMember("member-id", true); + ConsumerGroupMember member3 = group3.getOrMaybeCreateMember("member-id", true); + group0.updateMember(member0); + group1.updateMember(member1); + group2.updateMember(member2); + group3.updateMember(member3); + + snapshotRegistry.getOrCreateSnapshot(2000); + shard.commitUpTo(2000); + assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_EMPTY)); + assertEquals(4, shard.localGaugeValue(NUM_CONSUMER_GROUPS_STABLE)); + + group2.setGroupEpoch(1); + group3.setGroupEpoch(1); + + snapshotRegistry.getOrCreateSnapshot(3000); + shard.commitUpTo(3000); + assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_EMPTY)); + assertEquals(2, shard.localGaugeValue(NUM_CONSUMER_GROUPS_ASSIGNING)); + assertEquals(2, shard.localGaugeValue(NUM_CONSUMER_GROUPS_STABLE)); + + group2.setTargetAssignmentEpoch(1); + // Set member2 to ASSIGNING state. + new ConsumerGroupMember.Builder(member2) + .setPartitionsPendingAssignment(Collections.singletonMap(Uuid.ZERO_UUID, Collections.singleton(0))) + .build(); + + snapshotRegistry.getOrCreateSnapshot(4000); + shard.commitUpTo(4000); + assertEquals(0, shard.localGaugeValue(NUM_CONSUMER_GROUPS_EMPTY)); + assertEquals(1, shard.localGaugeValue(NUM_CONSUMER_GROUPS_ASSIGNING)); + assertEquals(1, shard.localGaugeValue(NUM_CONSUMER_GROUPS_RECONCILING)); + assertEquals(2, shard.localGaugeValue(NUM_CONSUMER_GROUPS_STABLE)); + + assertGaugeValue(registry, metricName("GroupMetadataManager", "NumConsumerGroups"), 4); + assertGaugeValue(registry, metricName("GroupMetadataManager", "NumConsumerGroupsEmpty"), 0); + assertGaugeValue(registry, metricName("GroupMetadataManager", "NumConsumerGroupsAssigning"), 1); + assertGaugeValue(registry, metricName("GroupMetadataManager", "NumConsumerGroupsReconciling"), 1); + assertGaugeValue(registry, metricName("GroupMetadataManager", "NumConsumerGroupsStable"), 2); + assertGaugeValue(registry, metricName("GroupMetadataManager", "NumConsumerGroupsDead"), 0); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java new file mode 100644 index 00000000000..9b49a633ef3 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import com.yammer.metrics.core.MetricsRegistry; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.stream.IntStream; + +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.GENERIC_GROUP_REBALANCES_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_CONSUMER_GROUPS; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.NUM_OFFSETS; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_EXPIRED_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertGaugeValue; +import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertMetricsForTypeEqual; +import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.metricName; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class GroupCoordinatorMetricsTest { + + @Test + public void testMetricNames() { + MetricsRegistry registry = new MetricsRegistry(); + Metrics metrics = new Metrics(); + + HashSet expectedMetrics = new HashSet<>(Arrays.asList( + metrics.metricName("offset-commit-rate", GroupCoordinatorMetrics.METRICS_GROUP), + metrics.metricName("offset-commit-count", GroupCoordinatorMetrics.METRICS_GROUP), + metrics.metricName("offset-expiration-rate", GroupCoordinatorMetrics.METRICS_GROUP), + metrics.metricName("offset-expiration-count", GroupCoordinatorMetrics.METRICS_GROUP), + metrics.metricName("offset-deletion-rate", GroupCoordinatorMetrics.METRICS_GROUP), + metrics.metricName("offset-deletion-count", GroupCoordinatorMetrics.METRICS_GROUP), + metrics.metricName("group-completed-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP), + metrics.metricName("group-completed-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP), + metrics.metricName("group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP), + metrics.metricName("group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP), + metrics.metricName("consumer-group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP), + metrics.metricName("consumer-group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP) + )); + + try { + try (GroupCoordinatorMetrics ignored = new GroupCoordinatorMetrics(registry, metrics)) { + HashSet expectedRegistry = new HashSet<>(Arrays.asList( + "kafka.coordinator.group:type=GroupMetadataManager,name=NumOffsets", + "kafka.coordinator.group:type=GroupMetadataManager,name=NumGroups", + "kafka.coordinator.group:type=GroupMetadataManager,name=NumGroupsPreparingRebalance", + "kafka.coordinator.group:type=GroupMetadataManager,name=NumGroupsCompletingRebalance", + "kafka.coordinator.group:type=GroupMetadataManager,name=NumGroupsStable", + "kafka.coordinator.group:type=GroupMetadataManager,name=NumGroupsDead", + "kafka.coordinator.group:type=GroupMetadataManager,name=NumGroupsEmpty", + "kafka.coordinator.group:type=GroupMetadataManager,name=NumConsumerGroups", + "kafka.coordinator.group:type=GroupMetadataManager,name=NumConsumerGroupsEmpty", + "kafka.coordinator.group:type=GroupMetadataManager,name=NumConsumerGroupsAssigning", + "kafka.coordinator.group:type=GroupMetadataManager,name=NumConsumerGroupsReconciling", + "kafka.coordinator.group:type=GroupMetadataManager,name=NumConsumerGroupsStable", + "kafka.coordinator.group:type=GroupMetadataManager,name=NumConsumerGroupsDead" + )); + + assertMetricsForTypeEqual(registry, "kafka.coordinator.group", expectedRegistry); + expectedMetrics.forEach(metricName -> assertTrue(metrics.metrics().containsKey(metricName))); + } + assertMetricsForTypeEqual(registry, "kafka.coordinator.group", Collections.emptySet()); + expectedMetrics.forEach(metricName -> assertFalse(metrics.metrics().containsKey(metricName))); + } finally { + registry.shutdown(); + } + } + + @Test + public void sumLocalGauges() { + MetricsRegistry registry = new MetricsRegistry(); + Metrics metrics = new Metrics(); + GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics); + SnapshotRegistry snapshotRegistry0 = new SnapshotRegistry(new LogContext()); + SnapshotRegistry snapshotRegistry1 = new SnapshotRegistry(new LogContext()); + TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0); + TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1); + GroupCoordinatorMetricsShard shard0 = coordinatorMetrics.newMetricsShard(snapshotRegistry0, tp0); + GroupCoordinatorMetricsShard shard1 = coordinatorMetrics.newMetricsShard(snapshotRegistry1, tp1); + coordinatorMetrics.activateMetricsShard(shard0); + coordinatorMetrics.activateMetricsShard(shard1); + + IntStream.range(0, 5).forEach(__ -> shard0.incrementLocalGauge(NUM_CONSUMER_GROUPS)); + IntStream.range(0, 5).forEach(__ -> shard1.incrementLocalGauge(NUM_CONSUMER_GROUPS)); + IntStream.range(0, 3).forEach(__ -> shard1.decrementLocalGauge(NUM_CONSUMER_GROUPS)); + + IntStream.range(0, 6).forEach(__ -> shard0.incrementLocalGauge(NUM_OFFSETS)); + IntStream.range(0, 2).forEach(__ -> shard1.incrementLocalGauge(NUM_OFFSETS)); + IntStream.range(0, 1).forEach(__ -> shard1.decrementLocalGauge(NUM_OFFSETS)); + + snapshotRegistry0.getOrCreateSnapshot(1000); + snapshotRegistry1.getOrCreateSnapshot(1500); + shard0.commitUpTo(1000); + shard1.commitUpTo(1500); + + assertEquals(5, shard0.localGaugeValue(NUM_CONSUMER_GROUPS)); + assertEquals(2, shard1.localGaugeValue(NUM_CONSUMER_GROUPS)); + assertEquals(6, shard0.localGaugeValue(NUM_OFFSETS)); + assertEquals(1, shard1.localGaugeValue(NUM_OFFSETS)); + assertEquals(7, coordinatorMetrics.numConsumerGroups()); + assertEquals(7, coordinatorMetrics.numOffsets()); + assertGaugeValue(registry, metricName("GroupMetadataManager", "NumConsumerGroups"), 7); + assertGaugeValue(registry, metricName("GroupMetadataManager", "NumOffsets"), 7); + } + + @Test + public void testGlobalSensors() { + MetricsRegistry registry = new MetricsRegistry(); + Time time = new MockTime(); + Metrics metrics = new Metrics(time); + GroupCoordinatorMetrics coordinatorMetrics = new GroupCoordinatorMetrics(registry, metrics); + GroupCoordinatorMetricsShard shard = coordinatorMetrics.newMetricsShard( + new SnapshotRegistry(new LogContext()), new TopicPartition("__consumer_offsets", 0) + ); + + shard.record(GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME, 10); + assertMetricValue(metrics, metrics.metricName("group-completed-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP), 1.0 / 3.0); + assertMetricValue(metrics, metrics.metricName("group-completed-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP), 10); + + shard.record(OFFSET_COMMITS_SENSOR_NAME, 20); + assertMetricValue(metrics, metrics.metricName("offset-commit-rate", GroupCoordinatorMetrics.METRICS_GROUP), 2.0 / 3.0); + assertMetricValue(metrics, metrics.metricName("offset-commit-count", GroupCoordinatorMetrics.METRICS_GROUP), 20); + + shard.record(OFFSET_EXPIRED_SENSOR_NAME, 30); + assertMetricValue(metrics, metrics.metricName("offset-expiration-rate", GroupCoordinatorMetrics.METRICS_GROUP), 1.0); + assertMetricValue(metrics, metrics.metricName("offset-expiration-count", GroupCoordinatorMetrics.METRICS_GROUP), 30); + + shard.record(GENERIC_GROUP_REBALANCES_SENSOR_NAME, 40); + assertMetricValue(metrics, metrics.metricName("group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP), 4.0 / 3.0); + assertMetricValue(metrics, metrics.metricName("group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP), 40); + + shard.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME, 50); + assertMetricValue(metrics, metrics.metricName("consumer-group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP), 5.0 / 3.0); + assertMetricValue(metrics, metrics.metricName("consumer-group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP), 50); + } + + private void assertMetricValue(Metrics metrics, MetricName metricName, double val) { + assertEquals(val, metrics.metric(metricName).metricValue()); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/MetricsTestUtils.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/MetricsTestUtils.java new file mode 100644 index 00000000000..8cd67a61b92 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/MetricsTestUtils.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.MetricsRegistry; + +import java.util.Set; +import java.util.TreeSet; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class MetricsTestUtils { + static void assertMetricsForTypeEqual( + MetricsRegistry registry, + String expectedPrefix, + Set expected + ) { + Set actual = new TreeSet<>(); + registry.allMetrics().forEach((name, __) -> { + StringBuilder bld = new StringBuilder(); + bld.append(name.getGroup()); + bld.append(":type=").append(name.getType()); + bld.append(",name=").append(name.getName()); + if (bld.toString().startsWith(expectedPrefix)) { + actual.add(bld.toString()); + } + }); + assertEquals(new TreeSet<>(expected), actual); + } + + static void assertGaugeValue(MetricsRegistry registry, MetricName metricName, long count) { + Gauge gauge = (Gauge) registry.allMetrics().get(metricName); + + assertEquals(count, (long) gauge.value()); + } + + static MetricName metricName(String type, String name) { + String mBeanName = String.format("kafka.coordinator.group:type=%s,name=%s", type, name); + return new MetricName("kafka.coordinator.group", type, name, null, mBeanName); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index bec98939d19..e13f27d13a4 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -21,6 +21,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.NotCoordinatorException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; @@ -245,6 +247,12 @@ public class CoordinatorRuntimeTest { return this; } + @Override + public CoordinatorShardBuilder withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) { + return this; + } + + @Override public CoordinatorShardBuilder withTopicPartition( TopicPartition topicPartition ) { @@ -288,12 +296,14 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(supplier) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); when(builder.withLogContext(any())).thenReturn(builder); when(builder.withTime(any())).thenReturn(builder); when(builder.withTimer(any())).thenReturn(builder); + when(builder.withCoordinatorMetrics(any())).thenReturn(builder); when(builder.withTopicPartition(any())).thenReturn(builder); when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); @@ -353,12 +363,14 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(supplier) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); when(builder.withLogContext(any())).thenReturn(builder); when(builder.withTime(any())).thenReturn(builder); when(builder.withTimer(any())).thenReturn(builder); + when(builder.withCoordinatorMetrics(any())).thenReturn(builder); when(builder.withTopicPartition(any())).thenReturn(builder); when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); @@ -405,12 +417,14 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(supplier) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); when(builder.withLogContext(any())).thenReturn(builder); when(builder.withTime(any())).thenReturn(builder); when(builder.withTimer(any())).thenReturn(builder); + when(builder.withCoordinatorMetrics(any())).thenReturn(builder); when(builder.withTopicPartition(any())).thenReturn(builder); when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); @@ -455,12 +469,14 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(supplier) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); when(builder.withLogContext(any())).thenReturn(builder); when(builder.withTime(any())).thenReturn(builder); when(builder.withTimer(any())).thenReturn(builder); + when(builder.withCoordinatorMetrics(any())).thenReturn(builder); when(builder.withTopicPartition(any())).thenReturn(builder); when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); @@ -522,12 +538,14 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(supplier) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); when(builder.withLogContext(any())).thenReturn(builder); when(builder.withTime(any())).thenReturn(builder); when(builder.withTimer(any())).thenReturn(builder); + when(builder.withCoordinatorMetrics(any())).thenReturn(builder); when(builder.withTopicPartition(any())).thenReturn(builder); when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); @@ -571,6 +589,7 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(supplier) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); @@ -610,6 +629,7 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(supplier) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); @@ -618,6 +638,7 @@ public class CoordinatorRuntimeTest { when(builder.withTimer(any())).thenReturn(builder); when(builder.withTime(any())).thenReturn(builder); when(builder.withTimer(any())).thenReturn(builder); + when(builder.withCoordinatorMetrics(any())).thenReturn(builder); when(builder.withTopicPartition(any())).thenReturn(builder); when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); @@ -649,6 +670,7 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); // Schedule the loading. @@ -758,6 +780,7 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); // Scheduling a write fails with a NotCoordinatorException because the coordinator @@ -779,6 +802,7 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); // Loads the coordinator. @@ -804,6 +828,7 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); // Loads the coordinator. @@ -850,6 +875,7 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); // Loads the coordinator. @@ -898,6 +924,7 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); // Loads the coordinator. @@ -953,6 +980,7 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); // Schedule a read. It fails because the coordinator does not exist. @@ -975,6 +1003,7 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); // Loads the coordinator. @@ -1017,6 +1046,7 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); // Loads the coordinator. @@ -1083,6 +1113,7 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(supplier) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); MockCoordinatorShard coordinator0 = mock(MockCoordinatorShard.class); @@ -1093,6 +1124,7 @@ public class CoordinatorRuntimeTest { when(builder.withLogContext(any())).thenReturn(builder); when(builder.withTime(any())).thenReturn(builder); when(builder.withTimer(any())).thenReturn(builder); + when(builder.withCoordinatorMetrics(any())).thenReturn(builder); when(builder.withTopicPartition(any())).thenReturn(builder); when(builder.withTime(any())).thenReturn(builder); when(builder.build()) @@ -1139,6 +1171,7 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); // Loads the coordinator. @@ -1191,6 +1224,7 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); // Loads the coordinator. @@ -1263,6 +1297,7 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); // Loads the coordinator. @@ -1332,6 +1367,7 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); // Loads the coordinator. @@ -1389,6 +1425,7 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); // Loads the coordinator. @@ -1436,12 +1473,14 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(supplier) .withCoordinatorRuntimeMetrics(runtimeMetrics) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); when(builder.withLogContext(any())).thenReturn(builder); when(builder.withTime(any())).thenReturn(builder); when(builder.withTimer(any())).thenReturn(builder); + when(builder.withCoordinatorMetrics(any())).thenReturn(builder); when(builder.withTopicPartition(any())).thenReturn(builder); when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); @@ -1506,12 +1545,14 @@ public class CoordinatorRuntimeTest { .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(supplier) .withCoordinatorRuntimeMetrics(runtimeMetrics) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); when(builder.withLogContext(any())).thenReturn(builder); when(builder.withTime(any())).thenReturn(builder); when(builder.withTimer(any())).thenReturn(builder); + when(builder.withCoordinatorMetrics(any())).thenReturn(builder); when(builder.withTopicPartition(any())).thenReturn(builder); when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder);